Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eddienko/15002c9cb41969088ddf555cbe3eea98 to your computer and use it in GitHub Desktop.
Save eddienko/15002c9cb41969088ddf555cbe3eea98 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Example using confluent-kafka-python with Tornado\n",
"\n",
"Tornado is a common async framework in Python. It enables concurrency among many non-blocking applications.\n",
"\n",
"Confluent-kafka-python is a Kafka framework by Confluent, the main for-profit entity in this space. It is known to be fast and follow the Java API fairly consistently. It is normally a blocking application.\n",
"\n",
"This notebook shows two approaches to provide non-blocking tornado-friendly operation with the `confluent_kafka` library."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from tornado import gen\n",
"from tornado.ioloop import IOLoop\n",
"import confluent_kafka as ck"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Connect to topic 'foo', set all partitions to offset 0"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"consumer = ck.Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'mygroup-foo2'})\n",
"parts = [ck.TopicPartition('foo', i, 0) for i in range(4)]\n",
"consumer.assign(parts)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Time baseline performance with blocking API"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 160 ms, sys: 0 ns, total: 160 ms\n",
"Wall time: 162 ms\n"
]
}
],
"source": [
"%%time\n",
"\n",
"L = []\n",
"\n",
"for i in range(100000):\n",
" msg = consumer.poll(2)\n",
" L.append(msg.value())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### First attempt with Tornado\n",
"\n",
"We build a Tornado coroutine to get data from our consumer in a non-blocking fashion. Our approach is as follows:\n",
"\n",
"1. If data is available immediately then return it\n",
"2. If data is not available then call the blocking `poll` function in a separate thread\n",
"\n",
"Note that consumers are not threadsafe. We handle this on the tornado side by only having one thread so concurrent calls to this function will process in a first-come-first-served fasion. We assume (perhaps erroneously) that the user isn't calling `consumer.poll` on their own in other threads."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"from concurrent.futures import ThreadPoolExecutor\n",
"from time import time\n",
"\n",
"e = ThreadPoolExecutor(1) # Use a single thread"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"from tornado import gen\n",
"\n",
"@gen.coroutine\n",
"def poll(consumer, timeout=1e9):\n",
" msg = consumer.poll(0)\n",
" if msg is None:\n",
" msg = yield e.submit(consumer.poll, timeout)\n",
" return msg"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Profile Tornado first attempt\n",
"\n",
"We reset our topics and make a tornado callback that uses our `poll` function."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2.124321699142456 seconds\n"
]
}
],
"source": [
"parts = [ck.TopicPartition('foo', i, 0) for i in range(4)]\n",
"consumer.assign(parts)\n",
"\n",
"L = []\n",
"\n",
"@gen.coroutine\n",
"def poll_many(timeout=1e9):\n",
" start = time()\n",
" for i in range(100000):\n",
" msg = yield poll(consumer, timeout=timeout)\n",
" L.append(msg.value())\n",
" end = time()\n",
" print(end - start, 'seconds')\n",
" \n",
"IOLoop.current().add_callback(poll_many, 10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This runs a bit slower than the pure `confluent_kafka` solution. Tornado's event loop does introduce some overhead. In *many* cases, this is fine. We're still operating at around 70,000 messages per second, which is more than adequate for many applications."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Tornado: second attempt with batching\n",
"\n",
"We can reduce this overhead by batching. Every call to `poll_batch` now returns a list of messages. This list has at least one element and no more than `batchsize` elements. We use the same trick with the thread pool executor."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"@gen.coroutine\n",
"def poll_batch(consumer, timeout=1e9, batchsize=100):\n",
" msg = consumer.poll(0)\n",
" if msg is None:\n",
" msg = yield e.submit(consumer.poll, timeout)\n",
" batch = [msg]\n",
" while len(batch) < 100:\n",
" msg = consumer.poll(0)\n",
" if msg is None:\n",
" break\n",
" else:\n",
" batch.append(msg)\n",
" return batch"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0.3510932922363281 seconds\n"
]
}
],
"source": [
"# Reset offsets to zero\n",
"parts = [ck.TopicPartition('foo', i, 0) for i in range(4)]\n",
"consumer.assign(parts)\n",
"\n",
"L = []\n",
"\n",
"@gen.coroutine\n",
"def poll_many(timeout=1e9):\n",
" start = time()\n",
" while len(L) < 100000:\n",
" batch = yield poll_batch(consumer, timeout=timeout, batchsize=100)\n",
" L.extend([msg.value() for msg in batch])\n",
" end = time()\n",
" print(end - start, 'seconds')\n",
"\n",
"IOLoop.current().add_callback(poll_many, 10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is faster. I see times that get all the way down to normal blocking speed. I also see times that are closer to a second. I attribute this variation to other things running on the event loop, but I'm not sure.\n",
"\n",
"This *can* get to full speed."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Other options\n",
"\n",
"Rather than rely on an external thread, we could register a `tornado.locks.Event` object that was triggered every time the internal buffer for a partition went from the empty to non-empty state. This would require changes within the `confluent_kafka` codebase to allow callbacks at various points during operation."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment