Skip to content

Instantly share code, notes, and snippets.

@jcrsilva
Created November 13, 2018 08:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jcrsilva/2a0dd9643eea274854e5d99e00a1154a to your computer and use it in GitHub Desktop.
Save jcrsilva/2a0dd9643eea274854e5d99e00a1154a to your computer and use it in GitHub Desktop.
Small python script to produce+consume to a kafka broker. Tests that the broker is working.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import logging
import sys
import os
import random
import string
from kafka import KafkaConsumer, KafkaProducer
def _exit_with_error(message, code=2):
"""
Helper function, exits with status '2' after printing a message
:return: None
"""
print message
sys.exit(code) # wrong argument bash code
def _get_arguments(args=sys.argv[1:]):
"""
Gets command line arguments, validates them and parses them into useful
formats
:return: Command line arguments
:rtype: dict
"""
parser = argparse.ArgumentParser(
description="Kafka Producer + Consumer tester"
)
# Configuration file path
parser.add_argument("-t", "--topic",
help="Topic to produce to, default is 'test'",
required=False,
default="test"
)
parser.add_argument("-ti", "--timeout",
help="Timeout to wait for produce/consume operations in seconds, default 30",
required=False,
default=30
)
parser.add_argument("-b", "--broker",
help="Broker connection string, default 'localhost:9092'",
required=False,
default="localhost:9092"
)
parser.add_argument("-n", "--number",
help="Number of messages to grab from topic to get our test message, default=30",
required=False,
default=30
)
parser.add_argument("-l", "--log-path",
help="The path to an output log file",
required=False
)
parser.add_argument("-v", "--verbosity",
action='count',
help="Level of verbosity for output log (-v...-vv)",
required=False
)
args = parser.parse_args(args=args)
# log path
if args.log_path:
args.log_path = os.path.abspath(args.log_path)
if not os.path.isfile(args.log_path)\
or not os.access(os.path.dirname(args.log_path), os.W_OK):
print "Log path '{}' doesnt seem to be writeable".format(args.log_path)
_exit_with_error(parser.print_help())
# verbosity
if not args.verbosity or args.verbosity <= 0:
args.verbosity = logging.WARNING
elif args.verbosity == 1:
args.verbosity = logging.INFO
elif args.verbosity == 2:
args.verbosity = logging.DEBUG
else:
args.verbosity = logging.NOTSET
return args
def _setup_logging(file_path=None, level=logging.WARNING):
"""
Sets up the root logger and level
:param str file_path: file path for the (optional) log file
:param int level: logging level as per the \
`logging module \
<https://docs.python.org/2/library/logging.html#logging-levels>`_
:return: None
"""
log_formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(name)s: %(message)s")
root_logger = logging.getLogger('')
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(log_formatter)
root_logger.addHandler(console_handler)
if file_path:
file_handler = logging.FileHandler(file_path)
file_handler.setFormatter(log_formatter)
root_logger.addHandler(file_handler)
root_logger.setLevel(level=level)
def _main():
args = _get_arguments()
_setup_logging(level=args.verbosity)
logger = logging.getLogger(__name__)
logger.debug("Starting up...")
random_str = ''.join(random.choice(string.ascii_lowercase) for i in xrange(20))
logger.debug(random_str)
producer = KafkaProducer(
bootstrap_servers=args.broker,
client_id='kafka_produce_consume',
)
future = producer.send(
topic=args.topic,
value=random_str,
)
result = future.get(timeout=args.timeout)
logger.info(result)
producer.close(timeout=args.timeout)
logger.info("Produced message successfully: {}".format(random_str))
consumer = KafkaConsumer(
args.topic,
auto_offset_reset="earliest",
bootstrap_servers=args.broker,
)
for i, message in enumerate(consumer):
if i >= args.number:
logger.error("Reached maximum number of messages in topic, did not find our message")
consumer.close()
sys.exit(1)
elif message.value == random_str:
logger.info("Found message!")
break
else:
consumer.close()
# @TODO add topic deletion once admin interface is exposed in kafka-python
sys.exit(0)
if __name__ == "__main__":
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment