Skip to content

Instantly share code, notes, and snippets.

@williamsjj
Created June 6, 2016 23:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save williamsjj/9e67287f0154816c3a733a39ad008437 to your computer and use it in GitHub Desktop.
Save williamsjj/9e67287f0154816c3a733a39ad008437 to your computer and use it in GitHub Desktop.
import json
from kafka import KafkaConsumer
from kafka.consumer.subscription_state import ConsumerRebalanceListener
class TopicSubChanged(ConsumerRebalanceListener):
def on_partitions_revoked(self,revoked):
print "Lost partitions: %s" % repr(revoked)
def on_partitions_assigned(self,assigned):
print "Acquired Partitions: %s" % repr(revoked)
KafkaConsumer(auto_offset_reset="earliest", enable_auto_commit=True)
consumer = KafkaConsumer(bootstrap_servers="localhost:9092",group_id="activity_history_listener",api_version="0.9")
consumer.subscribe(topics=["activity.history"],listener=TopicSubChanged())
print consumer.topics()
for msg in consumer:
print "Message (%s):\n\t" % repr(msg)
try:
print json.loads(msg.value[-1])
except ValueError:
print msg.value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment