Skip to content

Instantly share code, notes, and snippets.

@sravantit25
Created May 17, 2023 10:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sravantit25/2543171421a12bffcbb08adb842f7244 to your computer and use it in GitHub Desktop.
Save sravantit25/2543171421a12bffcbb08adb842f7244 to your computer and use it in GitHub Desktop.
"""
A Flask web app acting as Kafka consumer to fetch data from a Kafka topic.
This fetches help data (for the current week) that has been submitted to the Qxf2 Survey system.
"""
import logging
import json
from datetime import datetime
from dateutil.relativedelta import relativedelta, FR
from flask import Flask, render_template
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import kafka_conf as conf
app = Flask(__name__)
BOOTSTRAP_SERVERS = conf.BOOTSTRAP_SERVERS
TOPIC_NAME = conf.KAFKA_TOPIC
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename="consumer.log",
filemode="a",
)
def process_message(message, existing_messages):
"""
Processess only the messages submitted for the current week
Parameters:
message (str): The Kafka message as a string
existing_messages (set): The list of unique messages that have been read
Returns:
data (dict): Containing the respondent, help type, help date, and employee
"""
try:
msg = message.value.decode("utf-8")
data_dict = json.loads(msg)
data = json.loads(data_dict)
message_help_date = data.get("help_date")
date = datetime.today() + relativedelta(weekday=FR)
survey_date = date.strftime("%Y-%m-%d")
if message_help_date != survey_date:
return None
if data_dict not in existing_messages:
existing_messages.add(data_dict)
return data
except (json.JSONDecodeError, ValueError) as error:
logging.error("Error occured while processing messages: %s", error)
def fetch_messages():
"""
Connects to a Kafka cluster and consumes messages from a topic.
"""
existing_messages = set()
consumer = None
try:
consumer = KafkaConsumer(
TOPIC_NAME, bootstrap_servers=BOOTSTRAP_SERVERS, enable_auto_commit=True
)
for message in consumer:
processed_msg = process_message(message, existing_messages)
if processed_msg is None:
continue
yield processed_msg
except KafkaError as error:
logging.error("KafkaError occurred while consuming messages: %s", error)
finally:
if consumer is not None:
consumer.close()
@app.route("/survey")
def display_messages():
"""
Stream data from the topic onto Flask web page
Returns:
str: A rendered HTML page containing the data
"""
return stream_template("messages.html", data=fetch_messages())
def stream_template(template_name, **context):
"""
Enabling streaming back results to app
"""
app.update_template_context(context)
template = app.jinja_env.get_template(template_name)
streaming = template.stream(context)
return streaming
@app.errorhandler(404)
def page_not_found(error):
"""
Flask error handler function for 404 errors (HTTP status code 400)
"""
logging.error("Page not found: %s", error)
return render_template("400.html", error="Page not found"), 404
if __name__ == "__main__":
app.run(port=9000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment