Skip to content

Instantly share code, notes, and snippets.

@soscler
Last active March 24, 2019 17:06
Show Gist options
  • Save soscler/fd55b38cf0a51d940edb90c13bfee8e7 to your computer and use it in GitHub Desktop.
Save soscler/fd55b38cf0a51d940edb90c13bfee8e7 to your computer and use it in GitHub Desktop.
kafka consumer in python
from kafka import KafkaConsumer
from threading import Thread
import threading
import time
# To consume latest messages and auto-commit offsets
class Consumer(Thread):
def __init__(self, topic, server, groupId):
Thread.__init__(self)
self.topic = topic
self.bootstrapServer = server
self.groupId = groupId
def consume(self):
print("Inside consume method...")
# TODO : Subscibre to the topic
# consumer.subscribe(topic)
# messages = consumer.poll(timeout_ms=1000)
try:
consumer = KafkaConsumer(self.topic,
group_id=self.bootstrapServer,
bootstrap_servers=[self.groupId])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
# Process message
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
except:
print("Problem encounter while connecting to kafka")
def run(self):
print("About fetch data in kafka...")
self.consume()
# TODO: Take in account when the thread stop and the callback
topic = 'topic1'
server = 'localhost:9092'
groupId = 'DemoConsumer'
kafkaThread = Consumer(topic, groupId, server)
kafkaThread.start()
# TODO: safe delete the thread
print("Listing threads...")
threading.enumerate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment