Skip to content

Instantly share code, notes, and snippets.

@Geesu
Created June 14, 2017 18:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Geesu/8cc7158dfdd135ee701d7893bfc15dfe to your computer and use it in GitHub Desktop.
Save Geesu/8cc7158dfdd135ee701d7893bfc15dfe to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# Copyright (c) 2016 MariaDB Corporation Ab
#
# Use of this software is governed by the Business Source License included
# in the LICENSE.TXT file and at www.mariadb.com/bsl.
#
# Change Date: 2019-01-01
#
# On the date above, in accordance with the Business Source License, use
# of this software will be governed by version 2 or later of the General
# Public License.
# This program requires the kafka-python package which you can install with:
#
# pip install kafka-python
#
import sys
import argparse
import logging
from kafka import KafkaProducer
logger = logging.getLogger('myapp')
hdlr = logging.FileHandler('/var/tmp/myapp.log')
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.WARNING)
logger.error('Starting app...')
parser = argparse.ArgumentParser(description = "Publish JSON data read from standard input to a Kafka broker")
parser.add_argument("-K", "--kafka-broker", dest="kafka_broker",
help="Kafka broker in host:port format",
default=None, required=True)
parser.add_argument("-T", "--kafka-topic", dest="kafka_topic",
help="Kafka topic where the data is published",
default=None, required=True)
logger.error('Parsing args..')
opts = parser.parse_args(sys.argv[1:])
producer = KafkaProducer(bootstrap_servers=[opts.kafka_broker])
logger.error('Entering loop')
while True:
try:
buf = sys.stdin.readline()
logger.error('Reading data')
if len(buf) == 0:
break
data = buf.encode().strip()
logger.error(data)
producer.send(topic=opts.kafka_topic, value=data)
producer.flush()
# All other errors should interrupt the processing
except Exception as ex:
newFile = open("/var/tmp/last_data.bin", "wb")
newFile.write(buf)
print(ex)
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment