Skip to content

Instantly share code, notes, and snippets.

@RomanSteinberg
Created February 25, 2019 15:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save RomanSteinberg/b2631ce7863899b4a0145d05b1389659 to your computer and use it in GitHub Desktop.
Save RomanSteinberg/b2631ce7863899b4a0145d05b1389659 to your computer and use it in GitHub Desktop.
Benchmark parallel execution
from multiprocessing import Process, Queue, Event
from queue import Empty as QueueEmpty
from random import randint, seed
from time import monotonic as now
from datetime import timedelta
TASKS_COUNT = 12
ARRAY_SIZE = 2_000_000
POOL_SIZE = 2
def mult(x, y):
return [i*j for i,j in zip(x,y)]
def gen(sz):
seed(101)
return [randint(0, 100) for i in range(sz)]
def test():
a = gen(ARRAY_SIZE)
b = gen(ARRAY_SIZE)
c = mult(a, b)
d = sum(c)
return d
class TimeIt(object):
def __init__(self, tasks_number=10):
self.tasks_number = tasks_number
def __call__(self, method):
def wrappee( *args, **kw):
ts = now()
result = method(*args, **kw)
ave_time = timedelta(seconds=(now() - ts) / self.tasks_number)
print(f'Repeated {self.tasks_number}, average time is {ave_time} sec')
return result
return wrappee
# Sequential
def measure_sequential(test_value, tasks_number=10):
print(f'Started sequential execution of {tasks_number} tasks')
@TimeIt(tasks_number=tasks_number)
def measure():
for step in range(tasks_number):
res = test()
if res != test_value:
print(f'Step {step} produced wrong result {res}')
measure()
# Multiprocessing
class Worker:
def __init__(self, test_value):
self._test_value = test_value
self._task = None
self._queue = Queue()
self._is_free = Event()
self._is_free.set()
self._stop_received = Event()
def run(self):
while not self._stop_received.is_set():
if self._task is None:
try:
self._task = self._queue.get(timeout=1)
except QueueEmpty:
pass
else:
res = self._task()
if res != self._test_value:
print(f'Worker {self} produced wrong result {res}')
self._task = None
if self._queue.empty():
# очистить и перейти в состояние "free"
self._is_free.set()
# очистить, перейти в исходное состояние
self._stop_received.clear()
def register(self, task):
self._queue.put(task)
self._is_free.clear()
def is_free(self):
return self._is_free.is_set()
def stop(self):
self._stop_received.set()
def measure_multiprocessing(test_value, tasks_number=10, pool_size=4):
@TimeIt(tasks_number=tasks_number)
def measure(workers):
for step in range(tasks_number):
workers[step % pool_size].register(test)
all_stoped = False
while not all_stoped:
all_stoped = True
for w in workers:
if w.is_free():
w.stop()
else:
all_stoped = False
@TimeIt(tasks_number=1)
def init(measure, pool_size, tasks_number, test_value):
workers = [Worker(test_value) for i in range(pool_size)]
processes = [Process(target=w.run, name='worker') for i, w in enumerate(workers)]
processes.append(Process(target=measure, name='jobs', args=[workers]))
for p in processes:
p.start()
return processes
print('Started initialization for multiprocessing')
started_processes = init(measure, pool_size, tasks_number, test_value)
print(f'Started {len(started_processes) - 1} process in multiprocessing mode to execute {tasks_number} '
f'tasks and 1 to register tasks')
for i, p in enumerate(started_processes):
p.join()
# Celery
if __name__ == '__main__':
master_value = test()
print('Master value:', master_value)
measure_sequential(master_value, TASKS_COUNT)
measure_multiprocessing(master_value, TASKS_COUNT, POOL_SIZE)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment