Skip to content

Instantly share code, notes, and snippets.

@yungchin
Last active February 29, 2016 17:55
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save yungchin/0b107fd9f4e532de2da5 to your computer and use it in GitHub Desktop.
Parsely blog draft

PyKafka: the pythonic Kafka client library

It was sometime in 2014 that I first stumbled onto Kafka, and found Jay Kreps' really rather lucid essay "The Log: What every software engineer should know about real-time data's unifying abstraction". To my (admittedly historical-perspective-challenged) mind, it read as a founding text for a new paradigm in messaging systems. And if that was the first thought, of course the next one was "How do I talk to this from Python?"

At the time, the options were limited. There was kafka-python, which wasn't very mature yet, and there was another project called samsa, which was however stuck on an old Kafka protocol version. A bit of a barren landscape. But a lot can happen in a few months: Parsely adopted samsa, renamed it to PyKafka, and started what was essentially a rewrite that brought it up to date supporting the Kafka 0.8.2 protocol. Best of all: they got me to contribute some bits too.

This is a quick PyKafka primer. PyKafka basically aims to deal with all the particulars of talking to a Kafka cluster. Abstractly, all you need to do is tell it which "topic" your messages need to be sent to or retrieved from, and it will figure out which brokers to talk to, and will also handle any reconnections, cluster reconfigurations and retrying requests. But let's see all that in code.

One more thing before we dive in: if you should want to play along and need a toy cluster, jump to the final section below for a good tip on setting up a scratch Kafka cluster in no time.

Basic pykafka usage

PyKafka's API is organised so that most everything elegantly flows from a single entrypoint, a KafkaClient instance:

from pykafka import KafkaClient
client = KafkaClient("localhost:9092")

From there, you can discover available Kafka topics (try print client.topics) and then obtain a producer or consumer instance

topic = client.topics["topicname"]
producer = topic.get_producer()
consumer = topic.get_simple_consumer()

(More on why it's called a "simple" consumer later on.) Should you wish to explore low-level cluster details, you can dive into client.cluster or client.brokers, but for today we'll stick to the higher-level subjects of producing and consuming messages.

Producing to Kafka

At its simplest, all it takes is

producer = topic.get_producer()
producer.produce(b"your message")

For the finer points, there are two things worth touching upon: message partitioning, and asynchronous usage patterns. Or wait - maybe three things.

If your application is suited for it, the producer can be used as a context manager, like so:

with topic.get_producer() as producer:
    while not your_app.needs_stopping:
        producer.produce(your_app.generate_interesting_message())

This will ensure that, upon exiting the context, the program waits for the producer to flush its message queues.

("Wait, what message queues? You didn't mention queues before!")

Ah, that gets me to the second thing. What's important to know here, is that by default produce() ships your message asynchronously; PyKafka groups messages into batches in the background, which helps to reduce overhead and load on the Kafka cluster, and allows your application to charge ahead rather than block on network I/O. However, it also means that when produce() returns, you've no idea whether your message made it to disk on the cluster yet.

You have two options here. If you can afford produce() blocking until message delivery, the simplest thing is to start your producer in synchronous mode:

producer = topic.get_producer(sync=True)

In sync mode, either produce() returns and you know all is fine, or it raises an error. 1

The other option is to enable delivery reporting:

producer = topic.get_producer(delivery_reports=True)

In this mode, the producer instance exposes a queue interface on which it posts "delivery reports" for every message produced. You're then expected to regularly pull the reports from the queue:

msg, exc = producer.get_delivery_report(block=False, timeout=.1)

If exc is not None, delivery failed and you can inspect the msg object which carries the message and its partitioning key.

That brings me to the last bit I want to highlight here: message partitioning. By default, messages are randomly allocated to topic partitions, but if you add a message key to your messages, and define a custom partitioner, you can achieve any allocation you like. As an example, pykafka.partitioners.hashing_partitioner ensures that messages with a particular key consistently end up on the same partition:

producer = topic.get_producer(partitioner=hashing_partitioner)
producer.produce(b"your message", partition_key=b"bravo")
producer.produce(b"another message", partition_key=b"bravo")

will send both to the same topic partition.

So that's producing in a nutshell. How do we get these messages back out?

Consuming from Kafka

Again, this can be as simple as

consumer = topic.get_simple_consumer()
msg = consumer.consume()
print msg.value, msg.partition_key, msg.partition_id, msg.offset

That's it. You pulled a message from the topic, and can now unwrap the payload and message key, and find out what partition it came from (because we didn't specify otherwise, the consumer will read from all partitions that exist in this topic), and at what offset within the partition it sits.

Instead, you can also iterate over the consumer:

consumer = topic.get_simple_consumer(consumer_timeout_ms=5000)
for msg in consumer:
    print msg.value, msg.partition_key, msg.partition_id, msg.offset

Iteration stops if no new messages have become available for consumer_timeout_ms (or, if you don't specify this, iteration won't stop, and we block, awaiting new messages forever).

If you want the consumer to regularly store its offsets (ie how far into each topic partition you've read), and - upon starting - to resume at the last stored offsets, you need to pass it a group name under which to store these:

consumer = topic.get_simple_consumer(consumer_group=b"charlie",
                                     auto_commit_enable=True)

(Alternatively, don't set auto_commit_enable and call consumer.commit_offsets() at your discretion.)

There's more: a headline feature of PyKafka is that, like the Java client, it offers a "balanced consumer". What's that do, you ask? Well, the "simple" consumer is called that because it is not aware of other consumer instances that are reading the same topic. You can assign it a subset of all the available topic partitions

parts = topic.partitions
consumer_a = topic.get_simple_consumer(partitions=(parts[0], parts[2]))
consumer_b = topic.get_simple_consumer(partitions=(parts[1], parts[3]))

but adding a third consumer, or removing one, will not reshuffle partitions between them. The balanced consumer is capable of just that.

Balanced consumers connect to Zookeeper (which the Kafka cluster depends upon to coordinate between brokers), enabling coordination of partition assignments between all consumers in a named consumer group. The beauty is that in PyKafka, all it takes is

consumer_a = topic.get_balanced_consumer(consumer_group=b"charlie",
                                         zookeeper_connect="localhost:2181")

This gives you a consumer instance with practically the same interface as the simple consumer - except that, if you'd instantiate a consumer_b in the same group "charlie", consumer_a would be automatically notified of this, and would surrender half of its partitions to consumer_b, and so on. And, if consumer_b disappears for some unfortunate reason, consumer_a would tend to its orphaned partitions.

The pykafka.rdkafka module

If you're looking to squeeze out a few more messages per second, or to save a few CPU cycles for other uses, it's worth giving the pykafka.rdkafka module a try. This is a C extension module that wraps the highly performant librdkafka client library written by Magnus Edenhill.

To get going with librdkafka, make sure the library and development headers are correctly installed in some conventional location (eg. under /usr/local/) 2. If you then rebuild (or reinstall) pykafka, setup.py will automatically detect the presence of librdkafka and build the extension module.

Now, all it takes is an extra switch when instantiating a producer or consumer:

producer = topic.get_producer(use_rdkafka=True)
consumer = topic.get_simple_consumer(use_rdkafka=True)
consumer = topic.get_balanced_consumer(use_rdkafka=True)

If the extension isn't loadable, any of these calls will throw errors. Otherwise, that's it, you're good to go.

As a side note, the librdkafka C client is actually a fully-fledged client in itself, handling connections and reconnections upon cluster reconfigurations, and retrying failed requests, all automatically. In that sense, our wrapper code could have avoided communication with Kafka for itself at all. But we wanted to make the librdkafka-backed classes capable of serving as drop-in replacements for their pure-python equivalents, and as of the time of writing, the C client missed two features of PyKafka: consumer rebalancing, and support for Kafka 0.8.2's broker-based offset storage API. And thus, the librdkafka-backed consumer has been written to reuse our python code for offset handling and consumer rebalancing, building on the C client only to handle consume() calls. The price of this is that we end up with double the number of open connections to Kafka brokers.

This, of course, is just a lot of talk. Let's see about performance: what does the extra effort of hooking up librdkafka buy you?

Performance assessment

Let's go straight for the numbers:

[ six graphs here ? ]

All these tests were run against an 8-broker Kafka cluster, running locally on a 32-core system with ample RAM (60G) and after "pre-warming" the disk cache - that is to say, we read the entire test topic into RAM before benchmarking. What you're seeing here is a somewhat condensed summary - for more details see [this topic on the issue tracker][benchmark_issue].

The consumer benefits most from the librdkafka backing, as is clear in the raw throughput numbers, and more so in the cpu load numbers (the latter measured through the [resource module][docs_resource]). It's worth noting also, that the gains here are conservative estimates: the throughput numbers include instance-initialisation time, and the load numbers include everything from the start of the process, including interpreter and module loading. 3

Wrap-up

I hope you never made it to the end of this, and have instead already turned to the project page to clone the repo. If anything should be unclear (or not working for you) at any point, there is now a thriving community around PyKafka that can help out. Hit the mailing list or open an issue on the tracker. See you soon.

PS: (Ab)using testinstances tools to bring up a quick throw-away cluster

If you just want to have a play with PyKafka, or if you're a regular user looking to test an idea quickly, one mental blocker that might slow you down is that you'd need to bring both a Zookeeper and a Kafka cluster online to talk to. But hidden in the pykafka.test module sits a little gem that does all that for you.

This one-liner downloads the binaries to a tempdir of your choice, then brings up a 2-node Kafka cluster, reporting the brokers' port numbers back when it's all up and running:

# python -m pykafka.test.kafka_instance 2 --download-dir /tmp/kafka-bin/
* Hostname was NOT found in DNS cache
*   Trying 208.100.14.200...
* Connected to mirror.reverse.net (208.100.14.200) port 80 (#0)
> GET /pub/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz HTTP/1.1
> User-Agent: curl/7.38.0
> Host: mirror.reverse.net
> Accept: */*
> 
< HTTP/1.1 200 OK
< Date: Thu, 11 Feb 2016 14:17:41 GMT
* Server Apache is not blacklisted
< Server: Apache
< Last-Modified: Mon, 09 Mar 2015 18:29:48 GMT
< Accept-Ranges: bytes
< Content-Length: 16162559
< Connection: close
< Content-Type: application/x-gzip
< 
{ [data not shown]
* Closing connection 0
Cluster started.
Brokers: localhost:9092,localhost:9093
Zookeeper: localhost:2181
Waiting for SIGINT to exit.

That's it! All ready to go. The Kafka topics will be written to a tempdir under /tmp, which will be cleared completely when you kill the script by striking ctrl-c.

Footnotes

  1. Even in sync-mode, if you produce() from multiple threads concurrently, those messages may still be batched together. It's just that each of the producing threads will block waiting for delivery of the batch. This is a nice feature on, say, a multi-threaded webserver.

  2. Alternatively, you need to export some search paths to help the compiler find the librdkafka headers and lib, [like so][]. [like so]: https://github.com/Parsely/pykafka/blob/cec20cb79e9a899b67353c1b9da782526144af49/.travis.yml#L38-L40

  3. "Why?!" you ask? I guess benchmarking is hard to do fairly, and these choices simply gave the least jittery numbers. Benchmarking already initialised consumers suggested gains for the pykafka.rdkafka flavour of anywhere between 10x and over a 40x. [benchmark_issue]: https://github.com/Parsely/pykafka/issues/378 [docs_resource]: https://docs.python.org/2/library/resource.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment