Skip to content

Instantly share code, notes, and snippets.

@anandnalya
Forked from xyu/kafka-rebalance.py
Created January 26, 2017 06:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anandnalya/a69fc036fff249a61418a8de6b1a0b46 to your computer and use it in GitHub Desktop.
Save anandnalya/a69fc036fff249a61418a8de6b1a0b46 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import argparse
import json
import random
import sys
def get_topic( args, data ):
topic = data['partitions'][0]['topic']
for partition in data['partitions']:
if topic != partition['topic']:
raise ValueError( 'Rebalance only works on one topic at a time' )
return topic
def get_brokers( args, data ):
brokers = set()
if args.brokers:
for broker in args.brokers.split( ',' ):
brokers.add( int( broker ) )
else:
for partition in data['partitions']:
for broker in partition['replicas']:
brokers.add( broker )
return brokers
def get_target_replicas( args, data ):
if args.replicas:
replicas = args.replicas
else:
replicas = get_current_replicas( args, data )
return replicas
def get_current_replicas( args, data ):
num_replicas = set()
for partition in data['partitions']:
num_replicas.add( len( partition['replicas'] ) )
return max( num_replicas )
def get_broker_list( data, replicas, brokers ):
broker_list = []
per_broker = len( data['partitions'] ) * replicas / len( brokers )
for broker in brokers:
broker_list += [ broker ] * per_broker
return broker_list
def rebalance( data, replicas, broker_list, verbose ):
random.shuffle( broker_list )
new_partitions = []
for partition in data['partitions']:
i = 0
new_partition = partition.copy()
new_partition['replicas'] = []
while True:
# Ran out of brokers that can be assigned
if len( broker_list ) == i:
return None
# Don't use if matches any existing broker
if broker_list[i] in partition['replicas']:
i += 1
continue
# Don't use if matches already selected new broker
if broker_list[i] in new_partition['replicas']:
i += 1
continue
new_partition['replicas'] += [ broker_list[i] ]
broker_list = broker_list[0:i] + broker_list[1+i:]
i=0
# Found all brokers for this partition
if len( new_partition['replicas'] ) == replicas:
break
if verbose:
print "Partition " + str( partition['partition'] ) + "\t" + str( partition['replicas'] ) + " -> " + str( new_partition['replicas'] )
new_partitions += [ new_partition ]
return new_partitions
# CLI args
parser = argparse.ArgumentParser()
parser.add_argument(
"assignment", type=str,
help="current partition replica assignment (JSON)"
)
parser.add_argument(
"--brokers", type=str,
help="override brokers to use"
)
parser.add_argument(
"--replicas", type=int,
help="override number of replicas after rebalance"
)
parser.add_argument(
"-v", "--verbose",
help="increase output verbosity", action="store_true"
)
args = parser.parse_args()
data = json.loads( args.assignment )
# Check we have enough brokers to handle rebalance
if get_target_replicas( args, data ) + get_current_replicas( args, data ) > len( get_brokers( args, data ) ):
raise ValueError( 'Cannot rebalance, not enough brokers' )
topic = get_topic( args, data )
replicas = get_target_replicas( args, data )
brokers = get_brokers( args, data )
broker_list = get_broker_list( data, replicas, brokers )
# Try to assign 100 times then giveup
print "Attempting to rebalance " + topic
for i in range( 0, 1800 ):
if args.verbose:
print "Try " + str( i )
else:
sys.stdout.write('.')
if 74 == i%75:
print ""
new_partitions = rebalance( data, replicas, broker_list, args.verbose )
if None != new_partitions:
break
if None == new_partitions:
raise RuntimeError( 'Could not rebalance partitions' )
print "\nDone!\n"
print json.dumps(
{ "version" : 1, "partitions" : new_partitions },
separators=( ',', ':' )
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment