Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Last active May 14, 2019 15:50
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mrocklin/3feba6adcf9b33ffb261c896b5e6c343 to your computer and use it in GitHub Desktop.
Save mrocklin/3feba6adcf9b33ffb261c896b5e6c343 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Example using confluent-kafka-python with Tornado\n",
"\n",
"Tornado is a common async framework in Python. It enables concurrency among many non-blocking applications.\n",
"\n",
"Confluent-kafka-python is a Kafka framework by Confluent, the main for-profit entity in this space. It is known to be fast and follow the Java API fairly consistently. It is normally a blocking application.\n",
"\n",
"This notebook shows two approaches to provide non-blocking tornado-friendly operation with the `confluent_kafka` library."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from tornado import gen\n",
"from tornado.ioloop import IOLoop\n",
"import confluent_kafka as ck"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Connect to topic 'foo', set all partitions to offset 0"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"consumer = ck.Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'mygroup-foo2'})\n",
"parts = [ck.TopicPartition('foo', i, 0) for i in range(4)]\n",
"consumer.assign(parts)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Time baseline performance with blocking API"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 160 ms, sys: 0 ns, total: 160 ms\n",
"Wall time: 162 ms\n"
]
}
],
"source": [
"%%time\n",
"\n",
"L = []\n",
"\n",
"for i in range(100000):\n",
" msg = consumer.poll(2)\n",
" L.append(msg.value())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### First attempt with Tornado\n",
"\n",
"We build a Tornado coroutine to get data from our consumer in a non-blocking fashion. Our approach is as follows:\n",
"\n",
"1. If data is available immediately then return it\n",
"2. If data is not available then call the blocking `poll` function in a separate thread\n",
"\n",
"Note that consumers are not threadsafe. We handle this on the tornado side by only having one thread so concurrent calls to this function will process in a first-come-first-served fasion. We assume (perhaps erroneously) that the user isn't calling `consumer.poll` on their own in other threads."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"from concurrent.futures import ThreadPoolExecutor\n",
"from time import time\n",
"\n",
"e = ThreadPoolExecutor(1) # Use a single thread"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"from tornado import gen\n",
"\n",
"@gen.coroutine\n",
"def poll(consumer, timeout=1e9):\n",
" msg = consumer.poll(0)\n",
" if msg is None:\n",
" msg = yield e.submit(consumer.poll, timeout)\n",
" return msg"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Profile Tornado first attempt\n",
"\n",
"We reset our topics and make a tornado callback that uses our `poll` function."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2.124321699142456 seconds\n"
]
}
],
"source": [
"parts = [ck.TopicPartition('foo', i, 0) for i in range(4)]\n",
"consumer.assign(parts)\n",
"\n",
"L = []\n",
"\n",
"@gen.coroutine\n",
"def poll_many(timeout=1e9):\n",
" start = time()\n",
" for i in range(100000):\n",
" msg = yield poll(consumer, timeout=timeout)\n",
" L.append(msg.value())\n",
" end = time()\n",
" print(end - start, 'seconds')\n",
" \n",
"IOLoop.current().add_callback(poll_many, 10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This runs a bit slower than the pure `confluent_kafka` solution. Tornado's event loop does introduce some overhead. In *many* cases, this is fine. We're still operating at around 70,000 messages per second, which is more than adequate for many applications."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Tornado: second attempt with batching\n",
"\n",
"We can reduce this overhead by batching. Every call to `poll_batch` now returns a list of messages. This list has at least one element and no more than `batchsize` elements. We use the same trick with the thread pool executor."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"@gen.coroutine\n",
"def poll_batch(consumer, timeout=1e9, batchsize=100):\n",
" msg = consumer.poll(0)\n",
" if msg is None:\n",
" msg = yield e.submit(consumer.poll, timeout)\n",
" batch = [msg]\n",
" while len(batch) < 100:\n",
" msg = consumer.poll(0)\n",
" if msg is None:\n",
" break\n",
" else:\n",
" batch.append(msg)\n",
" return batch"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0.3510932922363281 seconds\n"
]
}
],
"source": [
"# Reset offsets to zero\n",
"parts = [ck.TopicPartition('foo', i, 0) for i in range(4)]\n",
"consumer.assign(parts)\n",
"\n",
"L = []\n",
"\n",
"@gen.coroutine\n",
"def poll_many(timeout=1e9):\n",
" start = time()\n",
" while len(L) < 100000:\n",
" batch = yield poll_batch(consumer, timeout=timeout, batchsize=100)\n",
" L.extend([msg.value() for msg in batch])\n",
" end = time()\n",
" print(end - start, 'seconds')\n",
"\n",
"IOLoop.current().add_callback(poll_many, 10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is faster. I see times that get all the way down to normal blocking speed. I also see times that are closer to a second. I attribute this variation to other things running on the event loop, but I'm not sure.\n",
"\n",
"This *can* get to full speed."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Other options\n",
"\n",
"Rather than rely on an external thread, we could register a `tornado.locks.Event` object that was triggered every time the internal buffer for a partition went from the empty to non-empty state. This would require changes within the `confluent_kafka` codebase to allow callbacks at various points during operation."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@vazmin
Copy link

vazmin commented Aug 28, 2018

How to use confluent-kafka-python Producer together with Tornado web server?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment