Created
July 5, 2016 20:04
-
-
Save mikesparr/346070874c66413fa3fca40c88bab008 to your computer and use it in GitHub Desktop.
Consumer helper class using Confluent Kafka Python library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
""" | |
stream_consumer.py | |
Created by Michael Sparr on 2016-01-17. | |
Copyright (c) 2016 Goomzee Corporation. All rights reserved. | |
""" | |
import sys | |
import os | |
import json | |
import uuid | |
import datetime | |
import time | |
import logging | |
import traceback | |
import confluent_kafka as kafka | |
def _printer(msg): | |
"""Test function to print messages from consumer""" | |
print("%s:%d:%d: key=%s value=%r" % (msg.topic(), msg.partition(), | |
msg.offset(), msg.key(), msg.value().decode('utf-8'))) | |
def _on_assign (c, ps): | |
"""Resets the consumer offset""" | |
for p in ps: | |
p.offset=-2 | |
c.assign(ps) | |
class StreamConsumer: | |
def __init__(self, cfg, group_id=None): | |
self.config = cfg | |
self.group_id = group_id | |
self.consumer = None | |
# create logger | |
def __get_logger(self): | |
"""Instantiates logger.""" | |
return logging.getLogger(os.path.basename(__file__)) | |
def consume(self, topics, auto_offset='latest', processor=_printer, *args): | |
"""Connects to topic and listens for messages, handing them off to processor""" | |
logger = self.__get_logger() | |
try: | |
running = True | |
conf = { | |
'bootstrap.servers': ','.join(map(str, self.config.get('hosts'))), | |
'group.id': self.group_id, | |
'default.topic.config': {'auto.offset.reset': auto_offset} | |
} | |
try: | |
c = kafka.Consumer(**conf) | |
except: | |
logger.error( "Error creating Consumer with config [{}]".format(conf) ) | |
try: | |
if auto_offset == 'earliest' or auto_offset == 'smallest': | |
c.subscribe(topics, on_assign=_on_assign) | |
else: | |
c.subscribe(topics) | |
except: | |
logger.error( "Error subscribing to topics [{}]".format(topics) ) | |
c = None | |
if c: | |
logger.info( "Starting to poll topics [{}]...".format(topics) ) | |
while running: | |
try: | |
msg = c.poll() | |
if not msg.error(): | |
processor(msg) | |
elif msg.error().code() == kafka.KafkaError._PARTITION_EOF: | |
# End of partition event | |
logger.debug( "{} [{}] reached end at offset {}".format( | |
msg.topic(), msg.partition(), msg.offset()) ) | |
else: | |
logger.debug( "Unknown error [{}]. Quitting...".format(msg.error()) ) | |
raise kafka.KafkaException(msg.error()) | |
except: | |
running = False | |
logger.error( "Error polling messages." ) | |
# end while running loop | |
else: | |
logger.error( "Consumer object missing. Nothing to do!" ) | |
try: | |
c.close() # close connection | |
except: | |
logger.warn( "Could not close connection (c). Nothing to do!" ) | |
except Exception, e: | |
logger.error( "Error consuming topics [{}]".format(topics) ) | |
logger.debug( traceback.format_exc() ) | |
return None | |
def main(): | |
logging.basicConfig(level=logging.DEBUG) | |
cnf = { | |
"hosts": ["localhost:9092"] | |
} | |
c = StreamConsumer(cnf, group_id='testgroup') | |
topics_list = ['test', 'test1', 'test2'] | |
c.consume(topics_list, | |
auto_offset='smallest') | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment