Skip to content

Instantly share code, notes, and snippets.

@jdidion
Created February 29, 2016 21:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jdidion/c65905b340ed4da7e40f to your computer and use it in GitHub Desktop.
Save jdidion/c65905b340ed4da7e40f to your computer and use it in GitHub Desktop.
A test of two different ways to implement a multiprocessing architecture for the problem "Read lines from a file, split them into chunks, have workers process the chunks in parallel, and have a writer thread write the results to an output file."
# Example of two different structures for multiprocessing code.
# The first uses queues to move chunks of data between the reader
# thread (which is the main process), the worker threads (which
# do the actual computation), and the writer thread (which writes
# the results to a file). The second using shared memory Values
# and Arrays: the reader thread reads data directly into shared
# memory char arrays, and each worker process the data in an array
# and writes the result back to that array; the writer thread
# copies the data from the finished char arrays to the output file.
#
# When running this on actual data (a file with 4M lines),
# it turns out that there is no significant difference in
# run-time. Since the queue-based method is much simpler
# to write and easier to read, there is no reason to use the
# shared-memory implementation.
from multiprocessing import Process, Queue, Array, Value
from time import sleep, time
from queue import Empty
import sys
CHUNKSIZE=1000
MAX_STR_SIZE = int(150 * 4 * CHUNKSIZE)
NUM_THREADS = 4
SLEEP_TIME = 1
def test_queues(infile, outfile):
input_queue = Queue(10)
output_queue = Queue()
control = Value('h', 0)
empty_list = [None] * (CHUNKSIZE * 4)
num_chunks = 0
class Worker(Process):
def __init__(self, input_queue, output_queue, control):
super(Worker, self).__init__()
self.input_queue = input_queue
self.output_queue = output_queue
self.control = control
def _continue(self):
with self.control.get_lock():
return self.control.value >= 0
def run(self):
while self._continue():
try:
reads = self.input_queue.get(timeout=1)
except Empty:
with self.control.get_lock():
if self.control.value > 0:
break
else:
continue
# do something with reads
sleep(SLEEP_TIME)
self.output_queue.put(reads)
class Writer(Process):
def __init__(self, output_queue, control, output_file):
super(Writer, self).__init__()
self.output_queue = output_queue
self.control = control
self.output_file = output_file
def _continue(self, seen_chunks):
with self.control.get_lock():
val = self.control.value
if val < 0:
return False
else:
if val > 0 and seen_chunks >= val:
return False
return True
def run(self):
seen_chunks = 0
with open(self.output_file, "wb") as o:
while self._continue(seen_chunks):
try:
reads = self.output_queue.get(timeout=1)
except Empty:
continue
# can use chunk index to preserve order
for read in reads:
o.write(read)
seen_chunks += 1
def queue_chunk(infile):
done = False
lines = empty_list.copy()
size = 0
for i in range(CHUNKSIZE * 4):
line = infile.readline()
if len(line) == 0:
done = True
break
lines[i] = line
size += 1
# the following blocks if the queue is full, to
# prevent consuming too much memory with waiting
# data
if not done:
input_queue.put(lines)
elif size > 0:
input_queue.put(lines[0:size])
return done
with open(infile, "rb") as f:
# start worker threads
workers = []
for i in range(NUM_THREADS):
worker = Worker(input_queue, output_queue, control)
workers.append(worker)
worker.start()
# start writer thread
writer = Writer(output_queue, control, outfile)
writer.start()
# handle completed chunks and add new ones
while True:
if queue_chunk(f):
break
num_chunks += 1
print("Done queuing chunks")
with control.get_lock():
control.value = num_chunks
writer.join()
print("Writer done")
for i,worker in enumerate(workers):
worker.join()
print("Worker {} done".format(i))
def test_shared_mem(infile, outfile):
input_queue = Queue()
output_queue = Queue()
done_queue = Queue()
chunks = [Value('l', 0) for i in range(10)]
values = [Array('c', MAX_STR_SIZE) for i in range(10)]
sizes = [Value('l', 0) for i in range(10)]
control = Value('h', 0)
chunk_idx = 0
class Worker(Process):
def __init__(self, values, sizes, input_queue, output_queue, control):
super(Worker, self).__init__()
self.values = values
self.sizes = sizes
self.input_queue = input_queue
self.output_queue = output_queue
self.control = control
def _continue(self):
with self.control.get_lock():
return self.control.value >= 0
def run(self):
while self._continue():
try:
idx = self.input_queue.get(timeout=1)
except Empty:
with self.control.get_lock():
if self.control.value > 0:
break
else:
continue
size = self.sizes[idx].value
if size == 0:
break
reads = self.values[idx][0:size]
# do something with reads
sleep(SLEEP_TIME)
self.values[idx][0:size] = reads
self.output_queue.put(idx)
class Writer(Process):
def __init__(self, chunks, values, sizes, output_queue, done_queue, control, output_file):
super(Writer, self).__init__()
self.chunks = chunks
self.values = values
self.sizes = sizes
self.output_queue = output_queue
self.done_queue = done_queue
self.control = control
self.output_file = output_file
def _continue(self, seen_chunks):
with self.control.get_lock():
val = self.control.value
if val < 0:
return False
elif val > 0 and seen_chunks >= val:
return False
return True
def run(self):
seen_chunks = 0
with open(self.output_file, "wb") as o:
while self._continue(seen_chunks):
try:
idx = self.output_queue.get(timeout=1)
except Empty:
continue
# can use chunk index to preserve order
chunk = self.chunks[idx].value
size = self.sizes[idx].value
reads = self.values[idx][0:size]
o.write(reads)
seen_chunks += 1
self.done_queue.put(idx)
def queue_chunk(infile, value_idx, chunk_idx):
size = 0
done = False
# NOTE: it is possible to get a speed-up here if your data
# records are fixed-size, meaning you can read a fixed number
# of bytes here rather than having to read one line at a time.
# We could also get a speed-up if we could read from the file
# directly into the Array, but I don't know how to do that
# since it doesn't look like you can sub-class FileIO.
for i in range(CHUNKSIZE * 4):
line = infile.readline()
if len(line) == 0:
done = True
break
start = size
size += len(line)
if size > MAX_STR_SIZE:
raise Exception("Too many bytes")
values[value_idx][start:size] = line
if size > 0:
chunks[value_idx].value = chunk_idx
sizes[value_idx].value = size
input_queue.put(value_idx)
return done
with open(infile, "rb") as f:
# read initial set of chunks
for i in range(10):
queue_chunk(f, i, chunk_idx)
chunk_idx += 1
# start worker threads
workers = []
for i in range(NUM_THREADS):
worker = Worker(values, sizes, input_queue, output_queue, control)
workers.append(worker)
worker.start()
# start writer thread
writer = Writer(chunks, values, sizes, output_queue, done_queue, control, outfile)
writer.start()
# handle completed chunks and add new ones
while True:
value_idx = done_queue.get()
if queue_chunk(f, value_idx, chunk_idx):
break
chunk_idx += 1
print("Done queuing reads")
with control.get_lock():
control.value = chunk_idx
writer.join()
print("Writer done")
for i,worker in enumerate(workers):
print("Worker {} done".format(i))
worker.join()
if __name__ == "__main__":
start = time()
test_queues(sys.argv[1], "test_queues.out.fq")
print("test_queues took {0:.2F} secs".format(time() - start))
start = time()
test_shared_mem(sys.argv[1], "test_shared_mem.out.fq")
print("test_shared_mem took {0:.2F} secs".format(time() - start))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment