Last active
December 8, 2016 22:25
-
-
Save aaronhanson/797f6c4f5a5289b1b956834ee2d1ffa5 to your computer and use it in GitHub Desktop.
Timed Kafka Consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from kafka import KafkaConsumer | |
import os | |
import sys | |
import time | |
import argparse | |
class TimedConsumer(): | |
def __init__(self, *args, **kwargs): | |
self.topic = kwargs.get('topic') | |
self.bootstrap_servers = kwargs.get('broker_list') | |
self.group_id = kwargs.get('group_id') | |
self.interval = kwargs.get('interval') | |
self.duration = kwargs.get('duration') | |
self.api_version = kwargs.get('api_version') | |
self.consumer = KafkaConsumer(self.topic, | |
bootstrap_servers=self.bootstrap_servers, | |
group_id=self.group_id, | |
api_version=self.api_version, | |
enable_auto_commit=False, | |
consumer_timeout_ms=60000) | |
def consume_messages(self): | |
print('reading messages every {}s for {}s'.format(self.interval, self.duration)) | |
start_time = time.time() | |
while time.time() < start_time + float(self.duration): | |
msg = None | |
try: | |
msg = self.consumer.next() | |
except StopIteration: | |
pass | |
if msg is not None: | |
print('{0}:{1} - {2}'.format(msg.partition, msg.offset, msg.value)) | |
else: | |
print('no message received by timeout') | |
self.consumer.commit() | |
time.sleep(float(self.interval)) | |
self.consumer.close() | |
print('done') | |
def setup_cli_params(): | |
parser = argparse.ArgumentParser(description='Read messages from Kafka topic.') | |
parser.add_argument('-t', '--topic', required=True, | |
help='the topic to read messages from') | |
parser.add_argument('-g', '--group_id', required=True, | |
help='the consumer group to read messages as') | |
parser.add_argument('-b', '--broker_list', required=True, | |
help='list of bootstrap brokers e.g. localhost:9092') | |
parser.add_argument('-d', '--duration', default='600', | |
help='duration in seconds to comsume (default: 600)') | |
parser.add_argument('-i', '--interval', default='2.0', | |
help='interval in seconds to delay consuming messages (default: 2.0)') | |
parser.add_argument('--api_version', default='0.8.2', | |
help='broker api version to use (default: 0.8.2') | |
return parser | |
parser = setup_cli_params() | |
args = vars(parser.parse_args()) | |
TimedConsumer(**args).consume_messages() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment