Skip to content

Instantly share code, notes, and snippets.

@yriveiro
Forked from chaimpeck/copy-bucket.py
Created August 30, 2018 15:13
Show Gist options
  • Save yriveiro/d3bf0b67f88d3a019eb6e969eeaca1c3 to your computer and use it in GitHub Desktop.
Save yriveiro/d3bf0b67f88d3a019eb6e969eeaca1c3 to your computer and use it in GitHub Desktop.
Copy a Riak Bucket from one server to another
#!/usr/bin/env python
"""Copy two riak buckets"""
import sys
from optparse import OptionParser
import fileinput
import logging
from riak import RiakClient
from multiprocessing import Pool
# Default paramaters to be used by option parser
DEFAULT_FROM_HOST = "localhost"
DEFAULT_FROM_BUCKET = ""
DEFAULT_TO_HOST = "localhost"
DEFAULT_TO_BUCKET = ""
DEFAULT_THREADS = 10
# Set up logging
log = logging.getLogger(__name__)
hdlr = logging.StreamHandler()
log_formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(log_formatter)
log.addHandler(hdlr)
def copy_obj(key):
"""Copy an object between two global buckets, based on key"""
global from_bucket
global to_bucket
from_obj = from_bucket.get(key)
to_obj = to_bucket.new(key, from_obj.data)
to_obj.usermeta = from_obj.usermeta
to_obj.links = from_obj.links
to_obj.indexes = from_obj.indexes
to_obj.store(return_body=False)
def set_log_level(verbosity):
"""
Set the log level based on an integer between 0 and 2
"""
log_level = logging.WARNING # default
if verbosity == 1:
log_level = logging.INFO
elif verbosity >= 2:
log_level = logging.DEBUG
log.setLevel(level=log_level)
def get_options():
"""
Returns the options and arguments passed to this script on the commandline
@return: (options,args)
"""
usage = "usage: %prog [options] file1 file2 ..."
parser = OptionParser(usage)
parser.add_option("-f", "--from-host", dest="from_host", default=DEFAULT_FROM_HOST,
help="From host [default: %default]")
parser.add_option("-b", "--from-bucket", dest="from_bucket", default=DEFAULT_FROM_BUCKET,
help="From bucket [default: %default]")
parser.add_option("-t", "--to-host", dest="to_host", default=DEFAULT_TO_HOST,
help="To host [default: %default]")
parser.add_option("-B", "--to-bucket", dest="to_bucket", default=DEFAULT_TO_BUCKET,
help="To bucket [default: %default]")
parser.add_option("-T", "--threads", dest="threads", default=DEFAULT_THREADS,
type="int", help="Number of threads [default: %default]")
parser.add_option('-v', '--verbose', dest='verbose', action='count',
help="Increase verbosity (specify multiple times for more)")
options, args = parser.parse_args()
return (options, args)
def main(args):
global to_bucket
global from_bucket
(options, args) = get_options()
set_log_level(options.verbose)
log.debug("Starting with options: %s" % (options))
streaming_riak = RiakClient(nodes=[{'host':node} for node in options.from_host.split(',')], protocol='pbc')
from_riak = RiakClient(nodes=[{'host':node} for node in options.from_host.split(',')], protocol='pbc')
to_riak = RiakClient(nodes=[{'host':node} for node in options.to_host.split(',')], protocol='pbc')
streaming_bucket = streaming_riak.bucket(options.from_bucket)
from_bucket = from_riak.bucket(options.from_bucket)
to_bucket = to_riak.bucket(options.to_bucket)
# Stream all keys
res = streaming_bucket.stream_index('$bucket', '_')
i = 0
pool = Pool(processes=options.threads)
results = []
# For each key, retrieve it and copy it's contents to the new bucket
for keys in res:
for key in keys:
i += 1
res = pool.apply_async(copy_obj, [key])
results.append(res)
sys.stdout.write("\rQueuing %d" % i)
sys.stdout.flush()
sys.stdout.write("\n")
total_keys = i
buffer_len = len("%d" % i)
for res in results:
i -= 1
res.get(timeout=10)
sys.stdout.write("\rWaiting for %d operations to finish" % i + " "*buffer_len)
sys.stdout.flush()
sys.stdout.write("\n")
# Confirm. Stream all new keys
res = to_bucket.stream_index('$bucket', '_')
i = 0
for keys in res:
for key in keys:
i += 1
sys.stdout.write("\rConfirming... Found %d keys" % i)
sys.stdout.flush()
sys.stdout.write("\n")
if i == total_keys:
print "Total number of keys match. Copy is complete"
else:
print "WARNING: Total number of keys do not match. Problem copying"
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment