Last active
December 8, 2016 22:26
-
-
Save aaronhanson/cc2b9e3d042fcf8b129e849ed44b5f62 to your computer and use it in GitHub Desktop.
Timed Kafka Producer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from kafka import KafkaProducer | |
import argparse | |
import os | |
import time | |
class TimedProducer(): | |
def __init__(self, *args, **kwargs): | |
self.topic = kwargs.get('topic') | |
self.bootstrap_servers = kwargs.get('broker_list') | |
self.duration = kwargs.get('duration') | |
self.interval = kwargs.get('interval') | |
self.api_version = kwargs.get('api_version') | |
self.producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers, | |
api_version=self.api_version) | |
def produce_messages(self): | |
start_time = time.time() | |
print('sending messages every {} for {}'.format(self.interval, self.duration)) | |
while time.time() < start_time + float(self.duration): | |
msg_key = os.urandom(16).encode('hex') | |
metadata = self.producer.send( | |
self.topic, | |
key=b'{}'.format(msg_key), | |
value=b'message {}'.format(msg_key) | |
).get() | |
print('{}:{} - {}'.format(metadata.partition, metadata.offset, msg_key)) | |
time.sleep(float(self.interval)) | |
self.producer.close() | |
print('done') | |
def setup_cli_params(): | |
parser = argparse.ArgumentParser(description='Read messages from Kafka topic.') | |
parser.add_argument('-b', '--broker_list', required=True, | |
help='list of bootstrap brokers e.g. localhost:9092') | |
parser.add_argument('-t', '--topic', required=True, | |
help='the topic to read messages from') | |
parser.add_argument('-i', '--interval', default='1.0', | |
help='interval in seconds between messages (default: 1.0)') | |
parser.add_argument('-d', '--duration', default='600', | |
help='duration in seconds to produce for (default: 600)') | |
parser.add_argument('--api_version', default='0.8.2', | |
help='broker api version to use (default: 0.8.2)') | |
return parser | |
parser = setup_cli_params() | |
args = vars(parser.parse_args()) | |
TimedProducer(**args).produce_messages() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment