Skip to content

Instantly share code, notes, and snippets.

@clayg
Last active February 3, 2017 20:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clayg/7618571f4055dbc008f5b1323cfdcff2 to your computer and use it in GitHub Desktop.
Save clayg/7618571f4055dbc008f5b1323cfdcff2 to your computer and use it in GitHub Desktop.
from argparse import ArgumentParser
from multiprocessing import Process, Queue
import random
import sys
import time
import logging
parser = ArgumentParser()
parser.add_argument('--workers', default=5, type=int, help='number of workers')
parser.add_argument('--priority', default=None, choices=['light', 'heavy'],
help='set priority to process jobs of that type first')
parser.add_argument('--light-count', default=100, type=int,
help='number of light jobs')
parser.add_argument('--light-size', default=5, type=float,
help='amount of "disk" consumed per light job')
parser.add_argument('--iops', default=50, type=int,
help='amount of "disk" per second')
parser.add_argument('--heavy-count', default=20, type=int,
help='number of heavy jobs')
parser.add_argument('--heavy-size', default=5, type=float,
help='amount of "network" consumed per heavy job')
parser.add_argument('--throughput', default=10, type=int,
help='amount of "network" per second')
parser.add_argument('-v', '--verbose', action='store_true',
help='debug logging')
parser.add_argument('-q', '--quiet', action='store_true',
help='less logging')
def main():
args = parser.parse_args()
if args.quiet:
level = logging.WARNING
elif args.verbose:
level = logging.DEBUG
else:
level = logging.INFO
logging.basicConfig(level=level)
light_jobs = ['light'] * args.light_count
heavy_jobs = ['heavy'] * args.heavy_count
if args.priority == 'light':
jobs = light_jobs + heavy_jobs
elif args.priority == 'heavy':
jobs = heavy_jobs + light_jobs
else:
jobs = light_jobs + heavy_jobs
random.shuffle(jobs)
q = Queue()
disk = Queue(10)
def feed_disk():
rate = 1.0 / args.iops
while True:
logging.debug('giving iop')
disk.put(None)
time.sleep(rate)
disk_p = Process(target=feed_disk)
disk_p.start()
network = Queue(10)
def feed_network():
rate = 1.0 / args.throughput
while True:
logging.debug('giving throughput')
network.put(None)
time.sleep(rate)
network_p = Process(target=feed_network)
network_p.start()
def process_q():
while True:
job = q.get()
if job == 'light':
for i in range(args.light_size):
logging.debug('taking iop')
disk.get()
elif job == 'heavy':
for i in range(args.heavy_size):
logging.debug('taking throughput')
network.get()
else:
break
logging.info('finished %s', job)
workers = []
for i in range(args.workers):
p = Process(target=process_q)
p.start()
workers.append(p)
for j in jobs:
q.put(j)
for p in workers:
q.put(None)
for p in workers:
p.join()
disk_p.terminate()
network_p.terminate()
if __name__ == "__main__":
sys.exit(main())
clayg:~/Workspace/scratch/concurrency$ time python mixed_weight_throughput.py -q
real 0m11.983s
user 0m0.180s
sys 0m0.135s
clayg:~/Workspace/scratch/concurrency$ time python mixed_weight_throughput.py -q --priority heavy
real 0m19.478s
user 0m0.180s
sys 0m0.135s
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment