Skip to content

Instantly share code, notes, and snippets.

@nside
Last active December 24, 2015 17:19
Show Gist options
  • Save nside/6834804 to your computer and use it in GitHub Desktop.
Save nside/6834804 to your computer and use it in GitHub Desktop.
Distributes a callback using multiprocessing queues and writes the result back to a file
import multiprocessing, time
from multiprocessing import Pool, JoinableQueue, Process
SENTINEL = -1
def worker(q,rq, callback):
for item in iter(q.get, SENTINEL):
callback(rq, item)
q.task_done()
q.task_done()
def writer(stream, r):
f = open(stream, 'w')
for item in iter(r.get, SENTINEL):
f.write(item)
r.task_done()
f.close()
r.task_done()
def distribute(it, callback, nb_workers, write_stream):
global SENTINEL
pool = multiprocessing.Pool(nb_workers)
g = JoinableQueue()
r = JoinableQueue()
procs = []
for i in range(nb_workers):
t = Process(target=worker, args=(g,r,callback))
t.daemon = True
t.start()
procs.append(t)
rt = Process(target=writer, args=(write_stream, r))
rt.start()
for item in it:
while g.qsize() > 5000:
time.sleep(1)
g.put(item)
g.join()
for i in range(nb_workers):
g.put(SENTINEL)
g.join()
r.put(SENTINEL)
r.join()
for p in procs:
p.join()
rt.join()
return True
if __name__ == "__main__":
def f(q, line):
q.put(line.upper())
assert distribute(open('test.txt'), f, 4, 'out.txt'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment