Skip to content

Instantly share code, notes, and snippets.

@kylemcdonald
Created September 8, 2019 15:10
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kylemcdonald/a2f0dcb86f01d4c57b68ac6a6c7a3068 to your computer and use it in GitHub Desktop.
Save kylemcdonald/a2f0dcb86f01d4c57b68ac6a6c7a3068 to your computer and use it in GitHub Desktop.
Testing the performance of Queue-based IPC in Python 3.7.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The built in `Pipe` is great if you are sending bytes. If you are not sending bytes, pickling and unpickling might become a bottleneck. \n",
"\n",
"I also checked https://github.com/portugueslab/arrayqueues which had very bad performance, and is specialized to numpy arrays not byte arrays."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from time import time\n",
"from multiprocessing import Process\n",
"from multiprocessing import Queue\n",
"from multiprocessing import Pipe\n",
"\n",
"n = 1000\n",
"big_data = b'\\0' * 1000 * 1000\n",
"\n",
"def print_elapsed(name, start):\n",
" elapsed = time() - start\n",
" ms_per_item = 1000 * elapsed / n\n",
" item_per_sec = n / elapsed\n",
" print(f'{name}: {ms_per_item:.3f} ms/item, {item_per_sec:.0f} item/sec')\n",
" \n",
"def producer(q):\n",
" start = time()\n",
" for i in range(n):\n",
" q.put(big_data)\n",
" print_elapsed('producer', start)\n",
" \n",
"def consumer(q):\n",
" start = time()\n",
" for i in range(n):\n",
" out = q.get()\n",
" print_elapsed('consumer', start)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"producer: 0.004 ms/item, 283150 item/sec\n",
"consumer: 0.822 ms/item, 1216 item/sec\n"
]
}
],
"source": [
"q = Queue()\n",
"producer_process = Process(target=producer, args=(q,))\n",
"consumer_process = Process(target=consumer, args=(q,))\n",
"consumer_process.start()\n",
"producer_process.start()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"consumer: 0.568 ms/item, 1761 item/sec\n",
"producer: 0.565 ms/item, 1770 item/sec\n"
]
}
],
"source": [
"class PipeQueue():\n",
" def __init__(self, *args):\n",
" self.out_pipe, self.in_pipe = Pipe(*args)\n",
" def put(self, item):\n",
" self.in_pipe.send(item)\n",
" def get(self):\n",
" return self.out_pipe.recv()\n",
" def close(self):\n",
" self.out_pipe.close()\n",
" self.in_pipe.close()\n",
" \n",
"q = PipeQueue()\n",
"producer_process = Process(target=producer, args=(q,))\n",
"consumer_process = Process(target=consumer, args=(q,))\n",
"consumer_process.start()\n",
"producer_process.start()\n",
"consumer_process.join()\n",
"producer_process.join()\n",
"q.close()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"consumer: 0.306 ms/item, 3272 item/sec\n",
"producer: 0.302 ms/item, 3306 item/sec\n"
]
}
],
"source": [
"class BytesPipeQueue():\n",
" def __init__(self, *args):\n",
" self.out_pipe, self.in_pipe = Pipe(*args)\n",
" def put(self, item):\n",
" self.in_pipe.send_bytes(item)\n",
" def get(self):\n",
" return self.out_pipe.recv_bytes()\n",
" def close(self):\n",
" self.out_pipe.close()\n",
" self.in_pipe.close()\n",
" \n",
"q = BytesPipeQueue()\n",
"producer_process = Process(target=producer, args=(q,))\n",
"consumer_process = Process(target=consumer, args=(q,))\n",
"consumer_process.start()\n",
"producer_process.start()\n",
"consumer_process.join()\n",
"producer_process.join()\n",
"q.close()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In CPython setting `duplex=False` uses an `os.pipe` [instead of two blocking sockets](https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/connection.py#L510-L519). This seems to be much slower."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"consumer: 0.735 ms/item, 1361 item/sec\n",
"producer: 0.731 ms/item, 1368 item/sec\n"
]
}
],
"source": [
"q = BytesPipeQueue(False)\n",
"producer_process = Process(target=producer, args=(q,))\n",
"consumer_process = Process(target=consumer, args=(q,))\n",
"consumer_process.start()\n",
"producer_process.start()\n",
"consumer_process.join()\n",
"producer_process.join()\n",
"q.close()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"zmq_producer: 0.390 ms/item, 2561 item/sec\n",
"zmq_consumer: 0.448 ms/item, 2232 item/sec\n"
]
}
],
"source": [
"import zmq\n",
"\n",
"def zmq_producer(address):\n",
" context = zmq.Context()\n",
" push = context.socket(zmq.PUSH)\n",
" push.bind(address)\n",
" start = time()\n",
" for i in range(n):\n",
" push.send(big_data)\n",
" print_elapsed('zmq_producer', start)\n",
" push.close()\n",
"\n",
"def zmq_consumer(address):\n",
" context = zmq.Context()\n",
" pull = context.socket(zmq.PULL)\n",
" pull.connect(address)\n",
" start = time()\n",
" for i in range(n):\n",
" item = pull.recv()\n",
" print_elapsed('zmq_consumer', start)\n",
" pull.close()\n",
" \n",
"address = 'tcp://127.0.0.1:5557'\n",
"producer_process = Process(target=zmq_producer, args=(address,))\n",
"consumer_process = Process(target=zmq_consumer, args=(address,))\n",
"consumer_process.start()\n",
"producer_process.start()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"zmq_producer: 0.369 ms/item, 2711 item/sec\n",
"zmq_consumer: 0.401 ms/item, 2493 item/sec\n"
]
}
],
"source": [
"address = 'ipc:///tmp/zmqtest'\n",
"producer_process = Process(target=zmq_producer, args=(address,))\n",
"consumer_process = Process(target=zmq_consumer, args=(address,))\n",
"consumer_process.start()\n",
"producer_process.start()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"nng_producer: 0.828 ms/item, 1208 item/sec\n",
"nng_consumer: 0.833 ms/item, 1201 item/sec\n"
]
}
],
"source": [
"from pynng import Pair0\n",
"from multiprocessing import Process\n",
"\n",
"def nng_producer(address):\n",
" push = Pair0()\n",
" push.listen(address)\n",
" start = time()\n",
" for i in range(n):\n",
" push.send(big_data)\n",
" print_elapsed('nng_producer', start)\n",
" push.close()\n",
"\n",
"def nng_consumer(address):\n",
" pull = Pair0()\n",
" pull.dial(address)\n",
" start = time()\n",
" for i in range(n):\n",
" item = pull.recv()\n",
" print_elapsed('nng_consumer', start)\n",
" pull.close()\n",
" \n",
"address = 'tcp://127.0.0.1:5557'\n",
"producer_process = Process(target=nng_producer, args=(address,))\n",
"consumer_process = Process(target=nng_consumer, args=(address,))\n",
"consumer_process.start()\n",
"producer_process.start()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"nng_producer: 1.294 ms/item, 773 item/sec\n",
"nng_consumer: 1.301 ms/item, 768 item/sec\n"
]
}
],
"source": [
"address = 'ipc://127.0.0.1:5557'\n",
"producer_process = Process(target=nng_producer, args=(address,))\n",
"consumer_process = Process(target=nng_consumer, args=(address,))\n",
"consumer_process.start()\n",
"producer_process.start()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"producer: 0.611 ms/item, 1637 item/sec\n",
"consumer: 0.614 ms/item, 1629 item/sec\n"
]
}
],
"source": [
"# sudo apt install libboost-dev\n",
"# pip install cinda\n",
"from cinda.ipc import BytesQueue\n",
"from cinda.ipc import free\n",
"\n",
"free('MyQueue')\n",
"q = BytesQueue('MyQueue', n, len(big_data))\n",
"producer_process = Process(target=producer, args=(q,))\n",
"consumer_process = Process(target=consumer, args=(q,))\n",
"consumer_process.start()\n",
"producer_process.start()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
@mficek
Copy link

mficek commented Dec 3, 2020

Great benchmark, thanks for sharing!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment