Skip to content

Instantly share code, notes, and snippets.

@satendrakumar
Last active January 10, 2018 04:40
Show Gist options
  • Save satendrakumar/e75978251704273e07e20917107adcc8 to your computer and use it in GitHub Desktop.
Save satendrakumar/e75978251704273e07e20917107adcc8 to your computer and use it in GitHub Desktop.
# confluent Kafka(.8.2) consumer
from confluent_kafka import Consumer
from confluent_kafka import TopicPartition
class KafkaConsumer:
def __init__(self,topic, hosts, partitions):
self.hosts = hosts
self.c = Consumer({
'bootstrap.servers': hosts,
'group.id': 'test-group',
'default.topic.config': {'auto.offset.reset': 'smallest'},
'api.version.request': False, 'broker.version.fallback': '0.8.2'})
tps = [TopicPartition(topic, partitionId) for partitionId in partitions]
self.c.assign(tps)
def read(self):
try:
print("Reading....... from " + self.hosts)
message = self.c.poll(timeout=3.0)
if (message):
if not message.error():
message_json = message.value().decode('utf-8')
print("Message from kafka " + message_json)
return eval(str(message_json))
except Exception:
print("Error while reading the message")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment