Skip to content

Instantly share code, notes, and snippets.

@olooney
Created October 25, 2019 00:20
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save olooney/04c48a2bac44b897753a532030730506 to your computer and use it in GitHub Desktop.
Python parallel worker threads with back-pressure
import os
import sys
from time import sleep
from threading import Thread
from queue import Queue
import codecs
def hard_work(x):
for n in range(100001):
x = codecs.encode(x, 'rot_13')
return x
def reader(fin, queue):
for line in fin:
queue.put(line)
queue.put(None)
def make_worker(work):
def worker(read_queue, write_queue):
while True:
inp = read_queue.get()
if inp is None:
write_queue.put(None)
break
out = work(inp)
write_queue.put(out)
return worker
def writer(queue, fout):
while True:
out = queue.get()
if out is None:
break
fout.write(out)
fout.write("\n")
def parallel_steps(steps, fin, fout, maxsize=2):
queues = [ Queue(maxsize=maxsize) for _ in range(len(steps)+1) ]
threads = []
threads.append(Thread(
name="reader",
target=reader,
args=(fin, queues[0])))
for index, step in enumerate(steps):
threads.append(Thread(
name="worker-{}".format(index+1),
target=make_worker(step),
args=(queues[index], queues[index+1])))
threads.append(
Thread(
name="writer",
target=writer,
args=(queues[-1], fout)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
fin = open(sys.argv[1], "rt")
fout = open(sys.argv[2], "wt", buffering=4096)
parallel_steps([hard_work] * int(sys.argv[3]), fin, fout, maxsize=10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment