Skip to content

Instantly share code, notes, and snippets.

@anfedorov
Last active November 3, 2019 14:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anfedorov/7dae4d7dc7a959ae438b2fa33b343f6c to your computer and use it in GitHub Desktop.
Save anfedorov/7dae4d7dc7a959ae438b2fa33b343f6c to your computer and use it in GitHub Desktop.
Use multiprocessing's primitives to process events while also extending the Queue
from time import sleep
from multiprocessing import Queue, JoinableQueue, Process
class PoolQueue(object):
def __init__(self, n):
self.num_procs = n
self.procs = []
self.payloads = JoinableQueue()
self.results = Queue()
def map(self, f, args):
def add_task(arg):
self.payloads.put(arg)
def process_task():
while True:
payload = self.payloads.get()
result = f(add_task, payload)
self.payloads.task_done()
self.results.put(result)
for arg in args:
add_task(arg)
for _ in range(self.num_procs):
proc = Process(target=process_task)
self.procs.append(proc)
proc.start()
self.payloads.join()
for p in self.procs:
p.kill()
while not self.results.empty():
r = self.results.get()
yield r
def process(add_task, payload):
print(payload)
if payload:
add_task(payload[:-1])
print(payload)
return payload
if __name__ == '__main__':
print(
len(
list(
PoolQueue(3).map(
process,
[
'abcdefghij',
'0987654321',
],
)
)
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment