Skip to content

Instantly share code, notes, and snippets.

@tatiana
Last active August 29, 2015 14:02
Show Gist options
  • Save tatiana/eaceb9975b3748e1c2a6 to your computer and use it in GitHub Desktop.
Save tatiana/eaceb9975b3748e1c2a6 to your computer and use it in GitHub Desktop.
Example of how to process kafka messages which are in Avro format
"""
Example of how to consume messages from Kafka, through Zookeeper.
Compatible with:
- Python 2.7.x
- Kafka 0.7.1
- Zookeeper 3.4.5
Requires the following Python packages, available at pypi.python.org:
fastavro==0.7.8
kazoo==1.3.1
samsa==0.3.11
"""
import sys
from cStringIO import StringIO
import fastavro as avro
from kazoo.client import KazooClient
from samsa.cluster import Cluster
DOMAINS = [
"zookeeper.kafka.server1.com",
"zookeeper.kafka.server2.com"
]
PORT = 2181
TOPIC = 'some-topic' # pre-defined
GROUP = 'my-app' # custom according to consumer
zookeeper_hosts = ",".join(["{0}:{1}".format(domain, PORT) for domain in DOMAINS])
zookeeper = KazooClient(hosts=zookeeper_hosts)
zookeeper.start()
cluster = Cluster(zookeeper)
topic = cluster.topics.get(TOPIC)
consumer = topic.subscribe(GROUP)
for message in consumer:
fp = StringIO(message)
data = avro.reader(fp)
for item in data:
print(item)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment