Created
May 29, 2020 15:12
-
-
Save akiross/adab52186134a0858bc6cd4d2e3d882b to your computer and use it in GitHub Desktop.
Python multiprocessing with queues example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Example on how to use processes to write and read data using multiprocessing. | |
This example allows to build readers and writers with a given "baseline" speed | |
to simulate readers/writers with different properties (e.g. fast readers, slow | |
writers, or vice versa). | |
By adjusting baseline speeds and numbers of writers and readers, you can see | |
what happens to the data being processed. | |
""" | |
import contextlib | |
import multiprocessing as mp | |
import queue | |
import random | |
import time | |
@contextlib.contextmanager | |
def measure_time(stats, key): | |
"""A context manager for measuring elapsed time and counts.""" | |
t = time.monotonic() | |
yield | |
elapsed = time.monotonic() - t | |
stats[key] = stats.get(key, 0) + elapsed | |
ck = key + "_count" | |
stats[ck] = stats.get(ck, 0) + 1 | |
def rand_time(baseline=1): | |
"""Returns some positive time to test.""" | |
r = 2 * random.random() - 1 | |
return max(0, r * baseline ** 0.5 + baseline) | |
def make_reader(count_stop, baseline): | |
"""Builds a reader with a given baseline reading speed.""" | |
def _reader(q, rid): | |
i = 0 | |
# Some stats | |
stats = {} | |
while True: | |
with measure_time(stats, "read_time"): | |
print("Reading value", (rid, i)) | |
time.sleep(rand_time(baseline)) # Simulate some reading time | |
with measure_time(stats, "queue_put_time"): | |
q.put((rid, i)) | |
i += 1 | |
if i > count_stop: | |
break | |
print("Reader stats:", stats) | |
return _reader | |
def make_writer(baseline): | |
"""Builds a writer with a given baseline writing speed.""" | |
def _writer(q, all_finished): | |
# Some stats | |
stats = {} | |
while not all_finished.is_set(): | |
try: | |
# Consume all elements in the queue | |
while True: | |
with measure_time(stats, "queue_get_wait"): | |
# Get the next value, but if you're waiting for more than | |
# 5 seconds then readers might have finished | |
v = q.get(timeout=5) | |
with measure_time(stats, "write_time"): | |
print("Writing value", v) | |
# Simulate some writing time | |
time.sleep(rand_time(baseline)) | |
except queue.Empty: | |
# Timeout passed, readers might have finished. | |
# Ignore this exception and check in the while loop if all is | |
# finished and we don't have to write anymore | |
pass | |
print("Writer stats:", stats) | |
return _writer | |
def start_workload(count, n_readers, n_writers, read_baseline, write_baseline): | |
# Signal when all readers have finished their job | |
readers_done = mp.Event() | |
# Queue used to transfer data between readers and writers | |
data_q = mp.Queue() | |
# Create read processes | |
read_procs = [ | |
mp.Process( | |
target=make_reader(count, read_baseline), | |
args=(data_q, i) | |
) | |
for i in range(n_readers) | |
] | |
# Create write processes | |
write_procs = [ | |
mp.Process( | |
target=make_writer(write_baseline), | |
args=(data_q, readers_done) | |
) | |
for i in range(n_writers) | |
] | |
# Start all processes | |
for p in read_procs + write_procs: | |
p.start() | |
# Wait for readers to finish | |
for rp in read_procs: | |
rp.join() | |
# Mark readers as done | |
readers_done.set() | |
# Wait for writers to finish | |
for wp in write_procs: | |
wp.join() | |
if __name__ == "__main__": | |
# All readers and writers have same speed | |
start_workload(5, 2, 2, 1, 1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment