Skip to content

Instantly share code, notes, and snippets.

@aaronhanson
Last active December 8, 2016 22:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aaronhanson/cc2b9e3d042fcf8b129e849ed44b5f62 to your computer and use it in GitHub Desktop.
Save aaronhanson/cc2b9e3d042fcf8b129e849ed44b5f62 to your computer and use it in GitHub Desktop.
Timed Kafka Producer
#!/usr/bin/env python
from kafka import KafkaProducer
import argparse
import os
import time
class TimedProducer():
def __init__(self, *args, **kwargs):
self.topic = kwargs.get('topic')
self.bootstrap_servers = kwargs.get('broker_list')
self.duration = kwargs.get('duration')
self.interval = kwargs.get('interval')
self.api_version = kwargs.get('api_version')
self.producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers,
api_version=self.api_version)
def produce_messages(self):
start_time = time.time()
print('sending messages every {} for {}'.format(self.interval, self.duration))
while time.time() < start_time + float(self.duration):
msg_key = os.urandom(16).encode('hex')
metadata = self.producer.send(
self.topic,
key=b'{}'.format(msg_key),
value=b'message {}'.format(msg_key)
).get()
print('{}:{} - {}'.format(metadata.partition, metadata.offset, msg_key))
time.sleep(float(self.interval))
self.producer.close()
print('done')
def setup_cli_params():
parser = argparse.ArgumentParser(description='Read messages from Kafka topic.')
parser.add_argument('-b', '--broker_list', required=True,
help='list of bootstrap brokers e.g. localhost:9092')
parser.add_argument('-t', '--topic', required=True,
help='the topic to read messages from')
parser.add_argument('-i', '--interval', default='1.0',
help='interval in seconds between messages (default: 1.0)')
parser.add_argument('-d', '--duration', default='600',
help='duration in seconds to produce for (default: 600)')
parser.add_argument('--api_version', default='0.8.2',
help='broker api version to use (default: 0.8.2)')
return parser
parser = setup_cli_params()
args = vars(parser.parse_args())
TimedProducer(**args).produce_messages()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment