Skip to content

Instantly share code, notes, and snippets.

@aaronhanson
Last active December 8, 2016 22:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aaronhanson/797f6c4f5a5289b1b956834ee2d1ffa5 to your computer and use it in GitHub Desktop.
Save aaronhanson/797f6c4f5a5289b1b956834ee2d1ffa5 to your computer and use it in GitHub Desktop.
Timed Kafka Consumer
#!/usr/bin/env python
from kafka import KafkaConsumer
import os
import sys
import time
import argparse
class TimedConsumer():
def __init__(self, *args, **kwargs):
self.topic = kwargs.get('topic')
self.bootstrap_servers = kwargs.get('broker_list')
self.group_id = kwargs.get('group_id')
self.interval = kwargs.get('interval')
self.duration = kwargs.get('duration')
self.api_version = kwargs.get('api_version')
self.consumer = KafkaConsumer(self.topic,
bootstrap_servers=self.bootstrap_servers,
group_id=self.group_id,
api_version=self.api_version,
enable_auto_commit=False,
consumer_timeout_ms=60000)
def consume_messages(self):
print('reading messages every {}s for {}s'.format(self.interval, self.duration))
start_time = time.time()
while time.time() < start_time + float(self.duration):
msg = None
try:
msg = self.consumer.next()
except StopIteration:
pass
if msg is not None:
print('{0}:{1} - {2}'.format(msg.partition, msg.offset, msg.value))
else:
print('no message received by timeout')
self.consumer.commit()
time.sleep(float(self.interval))
self.consumer.close()
print('done')
def setup_cli_params():
parser = argparse.ArgumentParser(description='Read messages from Kafka topic.')
parser.add_argument('-t', '--topic', required=True,
help='the topic to read messages from')
parser.add_argument('-g', '--group_id', required=True,
help='the consumer group to read messages as')
parser.add_argument('-b', '--broker_list', required=True,
help='list of bootstrap brokers e.g. localhost:9092')
parser.add_argument('-d', '--duration', default='600',
help='duration in seconds to comsume (default: 600)')
parser.add_argument('-i', '--interval', default='2.0',
help='interval in seconds to delay consuming messages (default: 2.0)')
parser.add_argument('--api_version', default='0.8.2',
help='broker api version to use (default: 0.8.2')
return parser
parser = setup_cli_params()
args = vars(parser.parse_args())
TimedConsumer(**args).consume_messages()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment