Skip to content

Instantly share code, notes, and snippets.

@kylekyle
Created March 22, 2022 01:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kylekyle/f2bd3f1606d1fe581161d9fa75581a88 to your computer and use it in GitHub Desktop.
Save kylekyle/f2bd3f1606d1fe581161d9fa75581a88 to your computer and use it in GitHub Desktop.
A full example of a producer/consumer using process pools in python
from threading import Event
from time import sleep
from traceback import print_exception
from multiprocessing import Pool, Manager
from queue import Full
from signal import signal, SIGTERM, SIGINT
class Consumer1:
def __init__(self):
print("Initializing Consumer1 ...")
def consume(self, batch):
sleep(0.001)
class Consumer2:
def __init__(self):
print("Initializing Consumer2 ...")
def consume(self, batch):
pass
def consume(q):
consumers = [Consumer1(), Consumer2()]
while True:
batch = q.get()
for consumer in consumers:
consumer.consume(batch)
if __name__ == '__main__':
num_processes = 10
with Pool(num_processes) as pool:
batch = 0
stopped = Event()
queue = Manager().Queue(num_processes)
for n in [SIGINT, SIGTERM]:
signal(n, lambda *a: stopped.set())
def error(ex):
print(ex)
print_exception(type(ex), ex, ex.__traceback__)
stopped.set()
for i in range(num_processes):
pool.apply_async(consume, (queue,), {}, None, error)
while not stopped.is_set():
batch += 1
try:
queue.put_nowait(batch)
except Full:
print("skipped batch", batch)
print("Terminating pool ...")
pool.terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment