Skip to content

Instantly share code, notes, and snippets.

@code6
Created June 13, 2014 09:44
Show Gist options
  • Save code6/570adc81ebb85231eab1 to your computer and use it in GitHub Desktop.
Save code6/570adc81ebb85231eab1 to your computer and use it in GitHub Desktop.
mdrill.task.assignment generate script
python gen_task_assignment_conf.py -m 10 -s 60 --hosts $hosts --ports $ports
#coding=utf8
import os
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import argparse
import random
def assign_task(mergelist, shardlist, hosts, ports):
#R1: __acker:0,heartbeat:0,merge:0 should use the same assignment
#R2: not two m / two s / m and s shard one (host, port)
#R3: task distribute evenly among hosts
host_ports = []
for h in hosts:
for p in ports:
host_ports.append((h, p))
if len(mergelist) + len(shardlist) > len(host_ports):
raise AttributeError("len(mergelist) (%d) + len(shardlist) (%d) is larger then len(host_ports) (%d)" % (len(mergelist), len(shardlist), len(host_ports)))
candicates = mergelist + shardlist
sz = len(candicates)
result = {}
for pos, hp in enumerate(host_ports):
if pos % sz == 0:
random.shuffle(candicates)
emptylist = candicates[:]
candi = candicates[pos % sz]
result.setdefault(candi, [])
if (not candi in emptylist) or (hp[0] in set(hp[0] for hp in result[candi])):
for new_candi in emptylist:
if new_candi != candi and (hp[0] in set(hp[0] for hp in result[new_candi])):
candi = new_candi
break
result.setdefault(candi, []).append(hp)
emptylist.remove(candi)
result['__acker:0'] = result['merge:0'][:]
result['heartbeat:0'] = result['merge:0'][:]
return result
def format_hostport_list(hplist):
prehost = None
portlist = []
hpstr_list = []
for hp in hplist:
host, port = hp
if host != prehost:
if prehost:
hpstr_list.append("%s:%s" % (prehost, ",".join(portlist)))
prehost = host
portlist = [port]
else:
portlist.append(port)
if prehost:
hpstr_list.append("%s:%s" % (prehost, ",".join(portlist)))
result = ";".join(hpstr_list)
return result
def format_service_config(service, hplist):
#"__acker:0;hadoop113.lf.sankuai.com:9601;9602;9603"
hpstr = format_hostport_list(hplist)
return "- \"%(service)s;%(hpstr)s\"" % locals()
def main(args):
hosts = args.hosts.split(',')
ports = args.ports.split(',')
mergelist = ['merge:%s' % i for i in range(args.merge)]
shardlist = []
for i in range(args.shard):
for j in range(args.replication):
shardlist.append('shard@%s:%s' % (j, i))
result = assign_task(mergelist, shardlist, hosts, ports)
for service in sorted(result.keys()):
hplist = result[service]
print format_service_config(service, hplist)
def parse_args():
parser = argparse.ArgumentParser(description='gen task assignment config')
parser.add_argument('--merge', '-m', help='merge server count', type=int, default=10)
parser.add_argument('--shard', '-s', help='shard server count', type=int, default=60)
parser.add_argument('--replication', '-r', help='shard replication count', type=int, default=1)
parser.add_argument('--hosts', help='supervisor host list', type=str)
parser.add_argument('--ports', help='worker port list', type=str)
args = parser.parse_args()
if not args.ports:
raise AttributeError("ports is required")
if not args.hosts:
raise AttributeError("hosts is required")
return args
if __name__ == "__main__":
main(parse_args())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment