Skip to content

Instantly share code, notes, and snippets.

@soscler
Last active March 24, 2019 17:06
Show Gist options
  • Save soscler/26329b1a98e1670bff6dea75b805fd74 to your computer and use it in GitHub Desktop.
Save soscler/26329b1a98e1670bff6dea75b805fd74 to your computer and use it in GitHub Desktop.
kafka consumer
from kafka import KafkaConsumer
topic = 'topic1'
server = 'localhost:9092'
groupId = 'DemoConsumer'
print("Inside consume method...")
consumer = KafkaConsumer(topic,
group_id=groupId,
bootstrap_servers=[server])
# consumer.subscribe(topic)
#messages = consumer.poll(timeout_ms=1000)
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
# Process message
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
print("Problem encounter while connecting to kafka")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment