Skip to content

Instantly share code, notes, and snippets.

@cipri7329
Last active September 16, 2016 14:03
Show Gist options
  • Save cipri7329/37b39bb358a384018e67cc336abb200e to your computer and use it in GitHub Desktop.
Save cipri7329/37b39bb358a384018e67cc336abb200e to your computer and use it in GitHub Desktop.
python kafka consumer
import time
__author__ = 'user'
import base64
import json
from kafka import KafkaConsumer
from kafka import TopicPartition
KAFKA_TOPIC = "scraped-data"
# KAFKA_TOPIC = "frontier-done"
KAFKA_HOST = "localhost:9092"
consumer = KafkaConsumer(bootstrap_servers=KAFKA_HOST)
consumer.assign([TopicPartition(KAFKA_TOPIC, 0)])
print "start listening to: " + KAFKA_TOPIC
for message in consumer:
localtime = time.localtime()
htime = time.strftime("%Y%m%d-%H%M%S", localtime)
print htime
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
string_data = str.decode(message.value, 'utf-8')
# print string_data
data = json.loads(message.value)
# print data
try:
print ("rowkey=%s\n%s" % (data['rowkey'], data))
for key in data.keys():
value = data[key]
decoded = base64.b64decode(value)
print ("%s : %s" % (key, decoded))
print ""
except Exception as ex:
print ex
print "dumping message: " + str(data)
print "----------"
print "kafka-topic listener finished"
@cipri7329
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment