Skip to content

Instantly share code, notes, and snippets.

@jasonrhaas
Last active October 23, 2015 16:09
Show Gist options
  • Save jasonrhaas/2281296a538908cfa85b to your computer and use it in GitHub Desktop.
Save jasonrhaas/2281296a538908cfa85b to your computer and use it in GitHub Desktop.
from pykafka import KafkaClient
import logging
logging.basicConfig(level=logging.INFO)
class KafkaConnect(object):
def __init__(self,
kafka_hosts='localhost:9092',
zk_hosts='localhost:2181',
topic='default'):
self.kafka_hosts = kafka_hosts
self.zk_hosts = zk_hosts
self.topic = topic
def setup(self):
try:
self.client = KafkaClient(self.kafka_hosts)
except RuntimeError:
raise
self.topic = self.client.topics[self.topic]
def consume(self, consumer='default'):
self.balanced_consumer = self.topic.get_balanced_consumer(
consumer_group=consumer,
auto_commit_enable=True,
zookeeper_connect=self.zk_hosts,
consumer_timeout_ms=1000
)
while True:
yield self.balanced_consumer.consume()
if __name__ == '__main__':
kafka_hosts = 'localhost:9092'
zk_hosts = 'localhost:2181'
topic = 'default'
consumer = 'pykafka-test'
k = KafkaConnect(kafka_hosts=kafka_hosts, zk_hosts=zk_hosts, topic=topic)
k.setup()
for msg in k.consume(consumer):
raw_input('Hit Enter for next msg')
print msg.value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment