Skip to content

Instantly share code, notes, and snippets.

@mynameisfiber
Last active August 10, 2017 20:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mynameisfiber/28f1f492d913641fcddbecfbcd367a7a to your computer and use it in GitHub Desktop.
Save mynameisfiber/28f1f492d913641fcddbecfbcd367a7a to your computer and use it in GitHub Desktop.
Numpy optimized multiprocessing queue
"""
Numpy-optimized multiprocessing queue object.
$ python numpyqueue.py
Array shape: 2
mp.queue: 0.4327036259928718
numpyqueue: 0.53742205198796
numpyqueue2: 0.5157967879931675
Array shape: 128
mp.queue: 1.7091998109972337
numpyqueue: 0.9855174750118749
numpyqueue2: 0.9052893449988915
Array shape: 256
mp.queue: 11.928916561999358
numpyqueue: 1.832176352996612
numpyqueue2: 1.8839863180037355
"""
import sharedmem
import numpy as np
import multiprocessing
from multiprocessing.queues import Empty
from contextlib import contextmanager
class NumpyQueue(object):
def __init__(self, shape, maxsize=128, sentinal=None, dtype='float'):
self.queue = multiprocessing.Queue(maxsize=maxsize-1)
self.locks = [multiprocessing.Lock() for _ in range(maxsize)]
self.data = sharedmem.empty([maxsize, *shape], dtype=dtype)
self.idx = multiprocessing.Value('i', 0)
self.SENTINAL = sentinal
self.maxsize = maxsize
self.shape = shape
def __len__(self):
return self.maxsize
def put(self, item, metadata=None):
"""
Add a numpy array onto the queue. item must have the same shape as
NumpyQueue.shape. Optionally, you can provide metadata to associate
with this item.
"""
if item is self.SENTINAL:
self.queue.put((self.SENTINAL, metadata))
else:
with self.idx:
idx = self.idx.value
with self.locks[idx]:
self.data[idx] = item
self.queue.put((idx, metadata))
self.idx.value = (self.idx.value + 1) % self.maxsize
def __iter__(self):
"""
Iterate through the items in the queue. This will return tuples
containing the numpy arrays and their associated metadata
"""
while True:
try:
with self.get(metadata=True) as (item, meta):
yield item, meta
except Empty:
raise StopIteration
@contextmanager
def get(self, metadata=False):
"""
Get the next queued numpy array. Optionally also return the metadata
associated with the item.
"""
idx, meta = self.queue.get()
if idx is self.SENTINAL:
raise Empty
else:
with self.locks[idx]:
if metadata:
yield self.data[idx], meta
else:
yield self.data[idx]
class TestProcessNumpyQueue(multiprocessing.Process):
def __init__(self, slm):
super().__init__()
self.slm = slm
def run(self):
for c in range(10000):
a = np.zeros(shape=self.slm.shape) + c
self.slm.put(a, metadata=c)
self.slm.put(self.slm.SENTINAL)
class TestProcessQueue(multiprocessing.Process):
def __init__(self, q, shape):
super().__init__()
self.shape = shape
self.q = q
def run(self):
for c in range(10000):
a = np.zeros(shape=self.shape) + c
self.q.put((a, c))
self.q.put((None, None))
def test_queue(size):
q = multiprocessing.Queue(maxsize=1024)
TestProcessQueue(q, (size, size)).start()
a = 0
while True:
sample, meta = q.get()
if sample is None:
break
a += sample[0, 0]
assert sample[0, 0] == meta
assert a == 49995000
def test_numpy_queue(size):
slm = NumpyQueue((size, size), maxsize=1024)
TestProcessNumpyQueue(slm).start()
a = 0
for sample, meta in slm:
a += sample[0, 0]
assert sample[0, 0] == meta
assert a == 49995000
def test_numpy_queue2(size):
"""
Alternate numpyqueue API
"""
slm = NumpyQueue((size, size), maxsize=1024)
TestProcessNumpyQueue(slm).start()
a = 0
while True:
try:
with slm.get(metadata=True) as (sample, meta):
a += sample[0, 0]
assert sample[0, 0] == meta
except Empty:
break
assert a == 49995000
def run_experiment(fxn, size, N):
import timeit
stmt = "{}({})".format(fxn.__name__, size)
return timeit.timeit(stmt=stmt, globals=globals(), number=N)
if __name__ == "__main__":
N = 1
for size in (2, 128, 256):
print("Array shape:", size)
print("mp.queue:", run_experiment(test_queue, size, N))
print("numpyqueue:", run_experiment(test_numpy_queue, size, N))
print("numpyqueue2:", run_experiment(test_numpy_queue2, size, N))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment