Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import json
from kafka import KafkaConsumer
from kafka.consumer.subscription_state import ConsumerRebalanceListener
class TopicSubChanged(ConsumerRebalanceListener):
def on_partitions_revoked(self,revoked):
print "Lost partitions: %s" % repr(revoked)
def on_partitions_assigned(self,assigned):
print "Acquired Partitions: %s" % repr(revoked)
KafkaConsumer(auto_offset_reset="earliest", enable_auto_commit=True)
consumer = KafkaConsumer(bootstrap_servers="localhost:9092",group_id="activity_history_listener",api_version="0.9")
consumer.subscribe(topics=["activity.history"],listener=TopicSubChanged())
print consumer.topics()
for msg in consumer:
print "Message (%s):\n\t" % repr(msg)
try:
print json.loads(msg.value[-1])
except ValueError:
print msg.value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment