Skip to content

Instantly share code, notes, and snippets.

@mberlanda
Created June 17, 2022 22:24
Show Gist options
  • Save mberlanda/b114a4ee5df52af99a11bafe9e03d161 to your computer and use it in GitHub Desktop.
Save mberlanda/b114a4ee5df52af99a11bafe9e03d161 to your computer and use it in GitHub Desktop.
Sample demonstration of a wrapper around a mp.JoinableQueue enforcing message deduplication via a mp.Manager and a mp.Lock.
import multiprocessing as mp
import os
import time
from dataclasses import dataclass
from datetime import datetime
@dataclass
class JobItem:
x: int
y: int
# define hash function to store refs into a manager dict for de-duplication
def __hash__(self):
return hash(self.__str__())
# https://stackoverflow.com/a/55563139
def delegate(to, *methods):
def dec(klass):
def create_delegator(method):
def delegator(self, *args, **kwargs):
obj = getattr(self, to)
m = getattr(obj, method)
return m(*args, **kwargs)
return delegator
for m in methods:
setattr(klass, m, create_delegator(m))
return klass
return dec
@delegate('queue', 'empty', 'get', 'join', 'task_done')
class JoinableQueueWrapper:
def __init__(self):
self.queue = mp.JoinableQueue()
self.cache = mp.Manager().dict()
self.lock = mp.Lock()
def push_item(self, item):
with self.lock:
if item in self.cache.keys():
# print("[CACHE] Item already processed. {}".format(item))
self.cache[item] += 1
return
self.queue.put(item)
self.cache[item] = 1
# store processed items to evaluate the output
processed_items = mp.Manager().list()
# max wait intervals before killing one process
WAIT_INTERVAL = 3
def worker(queue):
worker_wait_interval = WAIT_INTERVAL
pid = os.getpid()
print("[{} - {}] - Worker started".format(pid, datetime.now()))
while True:
if queue.empty():
if worker_wait_interval < 1:
break
print("[{} - {}] - Queue empty. Waiting {} seconds before stopping".format(pid, datetime.now(), worker_wait_interval))
time.sleep(1)
worker_wait_interval -= 1
continue
worker_wait_interval = WAIT_INTERVAL
item = queue.get()
queue.task_done()
processed_items.append(item.__str__())
x = int(item.x /2)
y = int(item.y /3)
if x > 0 or y > 0:
queue.push_item(JobItem(x, y))
queue.push_item(JobItem(x, y+1))
queue.push_item(JobItem(x-1, y))
time.sleep(0.1)
print("[{} - {}] - Processing item: {}".format(pid, datetime.now(), item))
if __name__ == '__main__':
start_time = datetime.now()
my_queue = JoinableQueueWrapper()
max_workers = mp.cpu_count() * 2
workers = [mp.Process(target=worker, args=(my_queue,)) for i in range(max_workers)]
my_queue.push_item(JobItem(12554530, 2018052))
my_queue.push_item(JobItem(3210 * 54, 415 * 567))
my_queue.push_item(JobItem(74335, 5791))
my_queue.push_item(JobItem(1033, 19795))
for p in workers:
p.daemon = True
p.start()
# wait for all workers to complete their jobs
for p in workers:
p.join()
# wait for the queue to be empty
my_queue.join()
# collect metrics to report data
total_processed_items = len(processed_items)
uniq_processed_items = len(set(processed_items))
total_dedup_items = sum(my_queue.cache.values()) - uniq_processed_items
print("total time: {}, workers: {}, total_processed_items: {}, uniq_processed_items: {}, total_dedup_items: {}".format(
datetime.now() - start_time,
max_workers,
total_processed_items,
uniq_processed_items,
total_dedup_items
))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment