It was sometime in 2014 that I first stumbled onto Kafka, and found Jay Kreps' really rather lucid essay "The Log: What every software engineer should know about real-time data's unifying abstraction". To my (admittedly historical-perspective-challenged) mind, it read as a founding text for a new paradigm in messaging systems. And if that was the first thought, of course the next one was "How do I talk to this from Python?"
At the time, the options were limited. There was kafka-python, which wasn't very mature yet, and there was another project called samsa, which was however stuck on an old Kafka protocol version. A bit of a barren landscape. But a lot can happen in a few months: Parsely adopted samsa, renamed it to PyKafka, and started what was essentially a rewrite that brought it up to date supporting the Kafka 0.8.2 protocol. Best of all: they got me to contribute some bits too.
This is a quick PyKafka primer. PyKafka basically aims to deal with all the particulars of talking to a Kafka cluster. Abstractly, all you need to do is tell it which "topic" your messages need to be sent to or retrieved from, and it will figure out which brokers to talk to, and will also handle any reconnections, cluster reconfigurations and retrying requests. But let's see all that in code.
One more thing before we dive in: if you should want to play along and need a toy cluster, jump to the final section below for a good tip on setting up a scratch Kafka cluster in no time.
PyKafka's API is organised so that most everything elegantly flows from a
single entrypoint, a KafkaClient
instance:
from pykafka import KafkaClient
client = KafkaClient("localhost:9092")
From there, you can discover available Kafka topics (try print client.topics
)
and then obtain a producer or consumer instance
topic = client.topics["topicname"]
producer = topic.get_producer()
consumer = topic.get_simple_consumer()
(More on why it's called a "simple" consumer later on.) Should you wish to
explore low-level cluster details, you can dive into client.cluster
or
client.brokers
, but for today we'll stick to the higher-level subjects of
producing and consuming messages.
At its simplest, all it takes is
producer = topic.get_producer()
producer.produce(b"your message")
For the finer points, there are two things worth touching upon: message partitioning, and asynchronous usage patterns. Or wait - maybe three things.
If your application is suited for it, the producer can be used as a context manager, like so:
with topic.get_producer() as producer:
while not your_app.needs_stopping:
producer.produce(your_app.generate_interesting_message())
This will ensure that, upon exiting the context, the program waits for the producer to flush its message queues.
("Wait, what message queues? You didn't mention queues before!")
Ah, that gets me to the second thing. What's important to know here, is that
by default produce()
ships your message asynchronously; PyKafka groups
messages into batches in the background, which helps to reduce overhead and
load on the Kafka cluster, and allows your application to charge ahead rather
than block on network I/O. However, it also means that when produce()
returns, you've no idea whether your message made it to disk on the cluster
yet.
You have two options here. If you can afford produce()
blocking until
message delivery, the simplest thing is to start your producer in synchronous
mode:
producer = topic.get_producer(sync=True)
In sync
mode, either produce()
returns and you know all is fine, or it
raises an error. 1
The other option is to enable delivery reporting:
producer = topic.get_producer(delivery_reports=True)
In this mode, the producer instance exposes a queue interface on which it posts "delivery reports" for every message produced. You're then expected to regularly pull the reports from the queue:
msg, exc = producer.get_delivery_report(block=False, timeout=.1)
If exc
is not None
, delivery failed and you can inspect the msg
object
which carries the message and its partitioning key.
That brings me to the last bit I want to highlight here: message partitioning.
By default, messages are randomly allocated to topic partitions, but if you
add a message key to your messages, and define a custom partitioner, you can
achieve any allocation you like.
As an example, pykafka.partitioners.hashing_partitioner
ensures that
messages with a particular key consistently end up on the same partition:
producer = topic.get_producer(partitioner=hashing_partitioner)
producer.produce(b"your message", partition_key=b"bravo")
producer.produce(b"another message", partition_key=b"bravo")
will send both to the same topic partition.
So that's producing in a nutshell. How do we get these messages back out?
Again, this can be as simple as
consumer = topic.get_simple_consumer()
msg = consumer.consume()
print msg.value, msg.partition_key, msg.partition_id, msg.offset
That's it. You pulled a message from the topic, and can now unwrap the payload and message key, and find out what partition it came from (because we didn't specify otherwise, the consumer will read from all partitions that exist in this topic), and at what offset within the partition it sits.
Instead, you can also iterate over the consumer:
consumer = topic.get_simple_consumer(consumer_timeout_ms=5000)
for msg in consumer:
print msg.value, msg.partition_key, msg.partition_id, msg.offset
Iteration stops if no new messages have become available for
consumer_timeout_ms
(or, if you don't specify this, iteration won't stop,
and we block, awaiting new messages forever).
If you want the consumer to regularly store its offsets (ie how far into each topic partition you've read), and - upon starting - to resume at the last stored offsets, you need to pass it a group name under which to store these:
consumer = topic.get_simple_consumer(consumer_group=b"charlie",
auto_commit_enable=True)
(Alternatively, don't set auto_commit_enable
and call
consumer.commit_offsets()
at your discretion.)
There's more: a headline feature of PyKafka is that, like the Java client, it offers a "balanced consumer". What's that do, you ask? Well, the "simple" consumer is called that because it is not aware of other consumer instances that are reading the same topic. You can assign it a subset of all the available topic partitions
parts = topic.partitions
consumer_a = topic.get_simple_consumer(partitions=(parts[0], parts[2]))
consumer_b = topic.get_simple_consumer(partitions=(parts[1], parts[3]))
but adding a third consumer, or removing one, will not reshuffle partitions between them. The balanced consumer is capable of just that.
Balanced consumers connect to Zookeeper (which the Kafka cluster depends upon to coordinate between brokers), enabling coordination of partition assignments between all consumers in a named consumer group. The beauty is that in PyKafka, all it takes is
consumer_a = topic.get_balanced_consumer(consumer_group=b"charlie",
zookeeper_connect="localhost:2181")
This gives you a consumer instance with practically the same interface as the
simple consumer - except that, if you'd instantiate a consumer_b
in the same
group "charlie", consumer_a
would be automatically notified of this, and
would surrender half of its partitions to consumer_b
, and so on. And, if
consumer_b
disappears for some unfortunate reason, consumer_a
would tend
to its orphaned partitions.
If you're looking to squeeze out a few more messages per second, or to save
a few CPU cycles for other uses, it's worth giving the pykafka.rdkafka
module a try. This is a C extension module that wraps the highly performant
librdkafka client library written by Magnus Edenhill.
To get going with librdkafka, make sure the library and development headers
are correctly installed in some conventional location (eg. under
/usr/local/
) 2. If you then rebuild (or reinstall) pykafka,
setup.py
will automatically detect the presence of librdkafka and build the
extension module.
Now, all it takes is an extra switch when instantiating a producer or consumer:
producer = topic.get_producer(use_rdkafka=True)
consumer = topic.get_simple_consumer(use_rdkafka=True)
consumer = topic.get_balanced_consumer(use_rdkafka=True)
If the extension isn't loadable, any of these calls will throw errors. Otherwise, that's it, you're good to go.
As a side note, the librdkafka C client is actually a fully-fledged client in
itself, handling connections and reconnections upon cluster reconfigurations,
and retrying failed requests, all automatically. In that sense, our wrapper
code could have avoided communication with Kafka for itself at all. But we
wanted to make the librdkafka-backed classes capable of serving as drop-in
replacements for their pure-python equivalents, and as of the time of writing,
the C client missed two features of PyKafka: consumer rebalancing, and support
for Kafka 0.8.2's broker-based offset storage API. And thus, the
librdkafka-backed consumer has been written to reuse our python code for
offset handling and consumer rebalancing, building on the C client only to
handle consume()
calls. The price of this is that we end up with double the
number of open connections to Kafka brokers.
This, of course, is just a lot of talk. Let's see about performance: what does the extra effort of hooking up librdkafka buy you?
Let's go straight for the numbers:
[ six graphs here ? ]
All these tests were run against an 8-broker Kafka cluster, running locally on a 32-core system with ample RAM (60G) and after "pre-warming" the disk cache - that is to say, we read the entire test topic into RAM before benchmarking. What you're seeing here is a somewhat condensed summary - for more details see [this topic on the issue tracker][benchmark_issue].
The consumer benefits most from the librdkafka backing, as is clear in the
raw throughput numbers, and more so in the cpu load numbers (the latter
measured through the [resource
module][docs_resource]). It's worth noting
also, that the gains here are conservative estimates: the throughput numbers
include instance-initialisation time, and the load numbers include
everything from the start of the process, including interpreter and module
loading. 3
I hope you never made it to the end of this, and have instead already turned to the project page to clone the repo. If anything should be unclear (or not working for you) at any point, there is now a thriving community around PyKafka that can help out. Hit the mailing list or open an issue on the tracker. See you soon.
If you just want to have a play with PyKafka, or if you're a regular user
looking to test an idea quickly, one mental blocker that might slow you
down is that you'd need to bring both a Zookeeper and a Kafka cluster
online to talk to. But hidden in the pykafka.test
module sits a little
gem that does all that for you.
This one-liner downloads the binaries to a tempdir of your choice, then brings up a 2-node Kafka cluster, reporting the brokers' port numbers back when it's all up and running:
# python -m pykafka.test.kafka_instance 2 --download-dir /tmp/kafka-bin/
* Hostname was NOT found in DNS cache
* Trying 208.100.14.200...
* Connected to mirror.reverse.net (208.100.14.200) port 80 (#0)
> GET /pub/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz HTTP/1.1
> User-Agent: curl/7.38.0
> Host: mirror.reverse.net
> Accept: */*
>
< HTTP/1.1 200 OK
< Date: Thu, 11 Feb 2016 14:17:41 GMT
* Server Apache is not blacklisted
< Server: Apache
< Last-Modified: Mon, 09 Mar 2015 18:29:48 GMT
< Accept-Ranges: bytes
< Content-Length: 16162559
< Connection: close
< Content-Type: application/x-gzip
<
{ [data not shown]
* Closing connection 0
Cluster started.
Brokers: localhost:9092,localhost:9093
Zookeeper: localhost:2181
Waiting for SIGINT to exit.
That's it! All ready to go. The Kafka topics will be written to a tempdir
under /tmp
, which will be cleared completely when you kill the script by
striking ctrl-c.
Footnotes
-
Even in sync-mode, if you
produce()
from multiple threads concurrently, those messages may still be batched together. It's just that each of the producing threads will block waiting for delivery of the batch. This is a nice feature on, say, a multi-threaded webserver. ↩ -
Alternatively, you need to export some search paths to help the compiler find the librdkafka headers and lib, [like so][]. [like so]: https://github.com/Parsely/pykafka/blob/cec20cb79e9a899b67353c1b9da782526144af49/.travis.yml#L38-L40 ↩
-
"Why?!" you ask? I guess benchmarking is hard to do fairly, and these choices simply gave the least jittery numbers. Benchmarking already initialised consumers suggested gains for the
pykafka.rdkafka
flavour of anywhere between 10x and over a 40x. [benchmark_issue]: https://github.com/Parsely/pykafka/issues/378 [docs_resource]: https://docs.python.org/2/library/resource.html ↩