-
-
Save sravantit25/2543171421a12bffcbb08adb842f7244 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A Flask web app acting as Kafka consumer to fetch data from a Kafka topic. | |
This fetches help data (for the current week) that has been submitted to the Qxf2 Survey system. | |
""" | |
import logging | |
import json | |
from datetime import datetime | |
from dateutil.relativedelta import relativedelta, FR | |
from flask import Flask, render_template | |
from kafka import KafkaConsumer | |
from kafka.errors import KafkaError | |
import kafka_conf as conf | |
app = Flask(__name__) | |
BOOTSTRAP_SERVERS = conf.BOOTSTRAP_SERVERS | |
TOPIC_NAME = conf.KAFKA_TOPIC | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s %(levelname)s: %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
filename="consumer.log", | |
filemode="a", | |
) | |
def process_message(message, existing_messages): | |
""" | |
Processess only the messages submitted for the current week | |
Parameters: | |
message (str): The Kafka message as a string | |
existing_messages (set): The list of unique messages that have been read | |
Returns: | |
data (dict): Containing the respondent, help type, help date, and employee | |
""" | |
try: | |
msg = message.value.decode("utf-8") | |
data_dict = json.loads(msg) | |
data = json.loads(data_dict) | |
message_help_date = data.get("help_date") | |
date = datetime.today() + relativedelta(weekday=FR) | |
survey_date = date.strftime("%Y-%m-%d") | |
if message_help_date != survey_date: | |
return None | |
if data_dict not in existing_messages: | |
existing_messages.add(data_dict) | |
return data | |
except (json.JSONDecodeError, ValueError) as error: | |
logging.error("Error occured while processing messages: %s", error) | |
def fetch_messages(): | |
""" | |
Connects to a Kafka cluster and consumes messages from a topic. | |
""" | |
existing_messages = set() | |
consumer = None | |
try: | |
consumer = KafkaConsumer( | |
TOPIC_NAME, bootstrap_servers=BOOTSTRAP_SERVERS, enable_auto_commit=True | |
) | |
for message in consumer: | |
processed_msg = process_message(message, existing_messages) | |
if processed_msg is None: | |
continue | |
yield processed_msg | |
except KafkaError as error: | |
logging.error("KafkaError occurred while consuming messages: %s", error) | |
finally: | |
if consumer is not None: | |
consumer.close() | |
@app.route("/survey") | |
def display_messages(): | |
""" | |
Stream data from the topic onto Flask web page | |
Returns: | |
str: A rendered HTML page containing the data | |
""" | |
return stream_template("messages.html", data=fetch_messages()) | |
def stream_template(template_name, **context): | |
""" | |
Enabling streaming back results to app | |
""" | |
app.update_template_context(context) | |
template = app.jinja_env.get_template(template_name) | |
streaming = template.stream(context) | |
return streaming | |
@app.errorhandler(404) | |
def page_not_found(error): | |
""" | |
Flask error handler function for 404 errors (HTTP status code 400) | |
""" | |
logging.error("Page not found: %s", error) | |
return render_template("400.html", error="Page not found"), 404 | |
if __name__ == "__main__": | |
app.run(port=9000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment