Skip to content

Instantly share code, notes, and snippets.

@ei-grad
Created October 28, 2016 16:48
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ei-grad/42028032733df038501018175c20ee61 to your computer and use it in GitHub Desktop.
Save ei-grad/42028032733df038501018175c20ee61 to your computer and use it in GitHub Desktop.
Python multiprocessing.Queue sucks
zmq: 2.904726
queue: 12.812472
from threading import Thread
from time import time
from multiprocessing import Queue
import zmq
def inproc(sock):
while True:
d = sock.recv_pyobj()
if not d:
break
sock.close()
def queue(q):
while True:
d = q.get()
if not d:
break
def test_inproc():
ctx = zmq.Context()
pull = ctx.socket(zmq.PULL)
pull.bind('inproc://worker')
worker = Thread(target=inproc, args=(pull,))
worker.start()
push = ctx.socket(zmq.PUSH)
push.connect('inproc://worker')
t0 = time()
for i in range(1000000):
push.send_pyobj(b'q')
push.send_pyobj(b'')
push.close()
worker.join()
ctx.term()
print("zmq: %f" % (time() - t0))
def test_queue():
q = Queue()
worker = Thread(target=queue, args=(q,))
worker.start()
t0 = time()
for i in range(1000000):
q.put(b'q')
q.put(b'')
worker.join()
print("queue: %f" % (time() - t0))
if __name__ == "__main__":
test_inproc()
test_queue()
@mosquito
Copy link

mosquito commented Dec 25, 2017

In [17]: async def queue(q):
    ...:     while True:
    ...:         d = await q.get()
    ...:         q.task_done()
    ...:
    ...:         if not d:
    ...:             break
    ...:
    ...: async def test_queue():
    ...:     q = asyncio.Queue()
    ...:     loop.create_task(queue(q))
    ...:
    ...:     t0 = time()
    ...:     for i in range(1000000):
    ...:         q.put_nowait(b'q')
    ...:     q.put_nowait(b'')
    ...:     print("published: %f" % (time() - t0))
    ...:
    ...:     await q.join()
    ...:     print("queue: %f" % (time() - t0))
    ...:
    ...: loop.run_until_complete(test_queue())
    ...:
    ...:
published: 1.324600
queue: 3.003884

@mosquito
Copy link

In [1]: import asyncio
   ...: from time import time
   ...:
   ...: loop = asyncio.get_event_loop()
   ...:
   ...:
   ...: async def queue(q):
   ...:     while True:
   ...:         d = await q.get()
   ...:         q.task_done()
   ...:
   ...:         if not d:
   ...:             break
   ...:
   ...: async def test_queue():
   ...:     q = asyncio.Queue()
   ...:     loop.create_task(queue(q))
   ...:
   ...:     t0 = time()
   ...:     for i in range(1000000):
   ...:         q.put_nowait(b'q')
   ...:     q.put_nowait(b'')
   ...:     print("published: %f" % (time() - t0))
   ...:
   ...:     await q.join()
   ...:     print("queue: %f" % (time() - t0))
   ...:
   ...: loop.run_until_complete(test_queue())
   ...:
published: 1.296258
queue: 2.956034

In [2]: import asyncio
   ...: from time import time
   ...: import uvloop
   ...:
   ...: asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
   ...: loop = asyncio.get_event_loop()
   ...:
   ...:
   ...: async def queue(q):
   ...:     while True:
   ...:         d = await q.get()
   ...:         q.task_done()
   ...:
   ...:         if not d:
   ...:             break
   ...:
   ...: async def test_queue():
   ...:     q = asyncio.Queue()
   ...:     loop.create_task(queue(q))
   ...:
   ...:     t0 = time()
   ...:     for i in range(1000000):
   ...:         q.put_nowait(b'q')
   ...:     q.put_nowait(b'')
   ...:     print("published: %f" % (time() - t0))
   ...:
   ...:     await q.join()
   ...:     print("queue: %f" % (time() - t0))
   ...:
   ...: loop.run_until_complete(test_queue())
   ...:
published: 1.352311
queue: 3.058588

с uvloop на маке немножко дольше даже

@ei-grad
Copy link
Author

ei-grad commented Dec 25, 2017

норм... а zmq и multiprocessing.Queue запусти чтоб на одном железе сравнить?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment