Skip to content

Instantly share code, notes, and snippets.

@akiross
Created May 29, 2020 15:12
Show Gist options
  • Save akiross/adab52186134a0858bc6cd4d2e3d882b to your computer and use it in GitHub Desktop.
Save akiross/adab52186134a0858bc6cd4d2e3d882b to your computer and use it in GitHub Desktop.
Python multiprocessing with queues example
"""Example on how to use processes to write and read data using multiprocessing.
This example allows to build readers and writers with a given "baseline" speed
to simulate readers/writers with different properties (e.g. fast readers, slow
writers, or vice versa).
By adjusting baseline speeds and numbers of writers and readers, you can see
what happens to the data being processed.
"""
import contextlib
import multiprocessing as mp
import queue
import random
import time
@contextlib.contextmanager
def measure_time(stats, key):
"""A context manager for measuring elapsed time and counts."""
t = time.monotonic()
yield
elapsed = time.monotonic() - t
stats[key] = stats.get(key, 0) + elapsed
ck = key + "_count"
stats[ck] = stats.get(ck, 0) + 1
def rand_time(baseline=1):
"""Returns some positive time to test."""
r = 2 * random.random() - 1
return max(0, r * baseline ** 0.5 + baseline)
def make_reader(count_stop, baseline):
"""Builds a reader with a given baseline reading speed."""
def _reader(q, rid):
i = 0
# Some stats
stats = {}
while True:
with measure_time(stats, "read_time"):
print("Reading value", (rid, i))
time.sleep(rand_time(baseline)) # Simulate some reading time
with measure_time(stats, "queue_put_time"):
q.put((rid, i))
i += 1
if i > count_stop:
break
print("Reader stats:", stats)
return _reader
def make_writer(baseline):
"""Builds a writer with a given baseline writing speed."""
def _writer(q, all_finished):
# Some stats
stats = {}
while not all_finished.is_set():
try:
# Consume all elements in the queue
while True:
with measure_time(stats, "queue_get_wait"):
# Get the next value, but if you're waiting for more than
# 5 seconds then readers might have finished
v = q.get(timeout=5)
with measure_time(stats, "write_time"):
print("Writing value", v)
# Simulate some writing time
time.sleep(rand_time(baseline))
except queue.Empty:
# Timeout passed, readers might have finished.
# Ignore this exception and check in the while loop if all is
# finished and we don't have to write anymore
pass
print("Writer stats:", stats)
return _writer
def start_workload(count, n_readers, n_writers, read_baseline, write_baseline):
# Signal when all readers have finished their job
readers_done = mp.Event()
# Queue used to transfer data between readers and writers
data_q = mp.Queue()
# Create read processes
read_procs = [
mp.Process(
target=make_reader(count, read_baseline),
args=(data_q, i)
)
for i in range(n_readers)
]
# Create write processes
write_procs = [
mp.Process(
target=make_writer(write_baseline),
args=(data_q, readers_done)
)
for i in range(n_writers)
]
# Start all processes
for p in read_procs + write_procs:
p.start()
# Wait for readers to finish
for rp in read_procs:
rp.join()
# Mark readers as done
readers_done.set()
# Wait for writers to finish
for wp in write_procs:
wp.join()
if __name__ == "__main__":
# All readers and writers have same speed
start_workload(5, 2, 2, 1, 1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment