Created
May 18, 2023 14:08
-
-
Save phloreenm/48a9a3d214f9857c264570497817d720 to your computer and use it in GitHub Desktop.
eManager
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from flask_pymongo import PyMongo | |
from pymongo import MongoClient | |
from dotenv import load_dotenv | |
from flask import ( | |
Flask, flash, render_template, request, redirect, url_for, session) | |
from bson.objectid import ObjectId | |
from werkzeug.security import generate_password_hash, check_password_hash | |
import logging | |
from utils.utility_functions import ( | |
check_sysadmin_user, format_date, process_form_data, should_edit_role) | |
if os.path.exists('env.py'): | |
import env | |
load_dotenv() | |
MONGO_URI = os.getenv('MONGO_URI') | |
MONGO_DBNAME = os.getenv('MONGO_DBNAME') | |
SECRET_KEY = os.getenv('SECRET_KEY') | |
IP = os.getenv('IP') | |
PORT = os.getenv('PORT') | |
app = Flask(__name__) | |
app.jinja_env.trim_blocks = True | |
app.jinja_env.lstrip_blocks = True | |
app.config['SECRET_KEY'] = SECRET_KEY | |
app.config['MONGO_URI'] = MONGO_URI | |
app.config['MONGO_DBNAME'] = MONGO_DBNAME | |
app.config['IP'] = IP | |
app.config['PORT'] = PORT | |
# Create MongoDB client instance: | |
client = MongoClient( | |
app.config['MONGO_URI'], | |
appname=app.name | |
) | |
# Access MongoDB database and collections with Python client instance: | |
db = client.get_database(app.config['MONGO_DBNAME']) | |
# Access collections: | |
users_coll = db['users'] | |
procedures_coll = db['procedures'] | |
daily_tasks_coll = db['daily_tasks'] | |
roles_coll = db['roles'] | |
temps_coll = db['daily_tasks_temps'] | |
dt_reps_coll = db['daily_tasks_reports'] | |
# function to create/check if 'sysadmin' users exists in the database | |
check_sysadmin_user() | |
@app.route('/') | |
@app.route('/index') | |
def index(): | |
try: | |
if 'user' in session: | |
return redirect(url_for('dashboard', role=session['role'])) | |
return render_template('index.html') | |
except Exception as e: | |
logging.error(f'Error accessing MongoDB: {e}', exc_info=True) | |
error_message = e | |
return render_template('error.html', error_message=error_message) | |
@app.route('/register', methods=['GET', 'POST']) | |
def register(): | |
if request.method == 'POST': | |
# Check if username or email have been used | |
form_un = request.form.get('un').lower() | |
form_email = request.form.get('email1').lower() | |
user = users_coll.find_one({'un': form_un}) | |
email = users_coll.find_one({'email': form_email}) | |
if ( | |
user == form_un.lower()) or ( | |
email == form_email.lower()): | |
if user: | |
flash(f'Username {user["un"]} already exists. ' | |
f'Choose a different username.', 'info') | |
elif email: | |
flash(f'Email {email["email"]} already exists. ' | |
f'Choose a different email.', 'info') | |
return redirect(url_for('register')) | |
new_user = { | |
'role': request.form.get('role'), | |
'un': request.form.get('un').lower(), | |
'pw': generate_password_hash(request.form.get('password1')), | |
'fname': request.form.get('first_name').lower(), | |
'lname': request.form.get('last_name').lower(), | |
'email': request.form.get('email1').lower(), | |
'company': request.form.get('company').lower() | |
} | |
# If new user, add it to the database: | |
users_coll.insert_one(new_user) | |
role = session['role'] | |
flash(f'\nUser {new_user["un"]} successfully registered!', 'success') | |
return redirect(url_for('users', role=role)) | |
db_roles = list(roles_coll.find()) | |
db_user_comp = users_coll.find_one( | |
{'un': session['user']} | |
) | |
restaurant = db_user_comp['company'] | |
return render_template( | |
'register_user.html', db_roles=db_roles, restaurant=restaurant) | |
@app.route('/login', methods=['GET', 'POST']) | |
def login(): | |
''' | |
Check if user is already logged in and redirect him to first page: | |
may be removed, as id user is session it's redirected to index | |
and the login link is not shown anymore | |
''' | |
if 'user' in session: | |
return redirect(url_for('index')) | |
if request.method == 'POST': | |
# Check if username is already registered | |
user = users_coll.find_one( | |
{'un': request.form.get('un').lower()} | |
) | |
if user: | |
# Check if password is correct | |
if check_password_hash(user['pw'], request.form.get('password')): | |
session['user'] = request.form.get('un').lower() | |
session['role'] = user['role'].lower() | |
flash( | |
f'\nUser {session["user"].title()} logged in!', 'success') | |
# return to the dashboard specific to user's role: | |
role = user['role'] | |
return redirect(url_for('dashboard', role=role)) | |
else: | |
flash('Incorrect Username and/or Password', 'danger') | |
return redirect(url_for('login')) | |
else: | |
# Username not registered | |
flash('Incorrect Username and/or Password', 'danger') | |
return redirect(url_for('login')) | |
return render_template('login.html') | |
@app.route('/edit_user/<user_id>', methods=['GET', 'POST']) | |
def edit_user(user_id): | |
# query the database for the user to be edited | |
# and populate form with user's info: | |
user = users_coll.find_one( | |
{'_id': ObjectId(user_id)} | |
) | |
roles = list(roles_coll.find()) | |
company = user['company'] | |
restaurant_manager_count = users_coll.count_documents({ | |
'company': company, | |
'role': 'manager', | |
}) | |
can_edit_role = should_edit_role( | |
restaurant_manager_count, session['role'], user['role']) | |
print("can_edit_role", can_edit_role) | |
if request.method == "POST": | |
if request.form.get('password'): | |
# if password is NOT empty: | |
if request.form.get('password1'): | |
# update user's info: | |
users_coll.update_one( | |
{'_id': ObjectId(user_id)}, | |
{ | |
'$set': { | |
'role': request.form.get('role').lower(), | |
'company': request.form.get('company').lower(), | |
'un': request.form.get('un').lower(), | |
'fname': request.form.get('first_name').lower(), | |
'lname': request.form.get('last_name').lower(), | |
'email': request.form.get('email1').lower(), | |
'pw': generate_password_hash( | |
request.form.get('password1')) | |
} | |
} | |
) | |
# if password field IS empty: | |
else: | |
# update user's info: | |
users_coll.update_one( | |
{'_id': ObjectId(user_id)}, | |
{ | |
'$set': { | |
'role': request.form.get('role').lower(), | |
'company': request.form.get('company').lower(), | |
'un': request.form.get('un').lower(), | |
'fname': request.form.get('first_name').lower(), | |
'lname': request.form.get('last_name').lower(), | |
'email': request.form.get('email1').lower() | |
} | |
} | |
) | |
flash('User updated successfully!', 'success') | |
return redirect(url_for('users', role=session['role'])) | |
# if password field is not empty: | |
flash('User updated successfully!', 'success') | |
return redirect(url_for('users')) | |
return render_template( | |
'edit_user.html', user=user, roles=roles, can_edit_role=can_edit_role, | |
restaurant=user['company'], | |
restaurant_manager_count=restaurant_manager_count) | |
@app.route('/delete_user/<user_id>') | |
def delete_user(user_id): | |
# Prevent deleting System Administrator: | |
sys_admin_user = users_coll.find_one( | |
{'_id': ObjectId(user_id), 'un': 'sysadmin'}) | |
if sys_admin_user is not None: | |
flash('Can not delete System Administrator', 'danger') | |
return redirect(url_for('users', role=session['role'])) | |
# Delete user from the database: | |
users_coll.delete_one({'_id': ObjectId(user_id)}) | |
# Query the database if the user was deleted | |
user = users_coll.find_one( | |
{'_id': ObjectId(user_id)} | |
) | |
if user: | |
flash('User not deleted!', 'danger') | |
else: | |
flash('User deleted successfully!', 'success') | |
return redirect(url_for('users', role=session['role'])) | |
@app.route('/logout') | |
def logout(): | |
# Check if user is logged in: | |
if 'user' not in session: | |
return redirect(url_for('login')) | |
# Remove user from session cookies | |
flash(f'\nUser {session["user"]} logged out!', 'success') | |
# session.pop('user') | |
session.clear() | |
return redirect(url_for('index')) | |
def filter_admin_users(users): | |
return [user for user in users if user['role'] in ['admin', 'manager']] | |
def filter_manager_users(users, company): | |
return [user for user in users if user['company'] == company] | |
def filter_employee_users(users, company): | |
return [ | |
user for user in users if user[ | |
'company'] == company and user['role'] == 'employee'] | |
@app.route('/dashboard/<role>') | |
def dashboard(role): | |
# Check if user is logged in: | |
if 'user' not in session: | |
return redirect(url_for('login')) | |
# Set user's role in session: | |
session['user_role'] = role | |
# company = users_coll.find_one({'un': logged_in_user})['company'] | |
# Check user's role and redirect if not logged in: | |
if role not in ['admin', 'manager', 'employee']: | |
flash('You\'re not authorized to access this page', 'danger') | |
return redirect(url_for('login')) | |
# Get list of all users: | |
users_list = list(users_coll.find().sort('role', 1)) | |
print("users_list", users_list) | |
logged_in_user = session['user'] | |
print("logged_in_user", logged_in_user) | |
logged_user_company = users_coll.find_one( | |
{'un': logged_in_user})['company'] | |
print("logged_user_company", logged_user_company) | |
# Filter users based on role: | |
if role == 'admin': | |
filtered_users = filter_admin_users(users_list) | |
return render_template('dashboard_admin.html', users=filtered_users) | |
elif role == 'manager': | |
filtered_users = filter_manager_users(users_list, logged_user_company) | |
return render_template('dashboard_manager.html', users=filtered_users) | |
else: | |
filtered_users = filter_employee_users( | |
users_list, logged_user_company and user['role'] == 'employee') | |
return render_template('dashboard_employee.html', users=filtered_users) | |
@app.route('/users') | |
def users(): | |
try: | |
logged_in_user = session['user'] | |
user_role = session['role'] | |
if user_role == 'admin': | |
users = list(users_coll.find().sort('role', 1)) | |
else: | |
company = users_coll.find_one({'un': logged_in_user})['company'] | |
users = list(users_coll.find({'company': company}).sort('role', 1)) | |
return render_template('users.html', users=users) | |
except Exception as e: | |
logging.error(f'Error accessing MongoDB: {e}', exc_info=True) | |
error_message = e | |
return render_template('error.html', error_message=error_message) | |
# @app.route('/procedures') | |
# # @login_required | |
# def procedures(): | |
# # Check if user is logged in: | |
# if 'user' not in session: | |
# return redirect(url_for('login')) | |
# try: | |
# procedures = list(procedures_coll.find()) | |
# return render_template('procedures.html', procedures=procedures) | |
# except Exception as e: | |
# logging.error(f'Error accessing MongoDB: {e}', exc_info=True) | |
# error_message = e | |
# return render_template('error.html', error_message=error_message) | |
@app.route('/tasks') | |
def tasks(): | |
tasks = list(daily_tasks_coll.find()) | |
t_reports = list(temps_coll.find( | |
{}).sort('timestamp', -1).limit(20)) | |
return render_template('tasks.html', tasks=tasks, t_reports=t_reports) | |
@app.route('/temps_form', methods=['GET', 'POST']) | |
def temps_form(): | |
if request.method == "POST": | |
ts_str = request.form['timestamp_tf'] | |
# ts_obj = format_date(ts_str) | |
ts_obj = datetime.strptime(ts_str, '%Y-%m-%d %H:%M') | |
iso_date = ts_obj.isoformat() | |
user = users_coll.find_one( | |
{'un': session['user'].lower()} | |
) | |
# get the form data: | |
temp_measurement = { | |
'dish_name': request.form.get('dish_name').lower(), | |
'probe_used': request.form.get('probe_used').lower(), | |
'temperature': request.form.get('temperature'), | |
'unit': request.form.get('unit').lower(), | |
'timestamp': iso_date, | |
'user': user['un'].lower(), | |
'company': user['company'].lower(), | |
'comments': request.form.get('comments').lower() | |
} | |
temps_coll.insert_one(temp_measurement) | |
flash('Temperature added successfully!', 'success') | |
return redirect(url_for('reports', report_type='temperatures')) | |
return render_template('temps_form.html') | |
@app.route('/shift_form/<form_type>', methods=['GET', 'POST']) | |
def shift_form(form_type): | |
dt = list(daily_tasks_coll.find({'t_type': form_type.lower()}).limit(1)) | |
if request.method == "POST": | |
process_form_data(request.form, dt, session['user']) | |
if (form_type == 'begin'): | |
flash('Commencing Shift Report added successfully!', 'success') | |
elif (form_type == 'finish'): | |
flash('Finishing Shift Report added successfully!', 'success') | |
return redirect(url_for('tasks')) | |
return render_template('shift_form.html', dt=dt) | |
@app.route('/reports/<report_type>') | |
def reports(report_type): | |
user = users_coll.find_one( | |
{'un': session['user'].lower()} | |
) | |
if report_type == 'begin': | |
reports = list(dt_reps_coll.find( | |
{'t_type': 'begin'}).sort('timestamp', -1).limit(20)) | |
for report in reports: | |
report['t_ts_submit'] = format_date(report['t_ts_submit']) | |
return render_template( | |
'reports_list.html', reports=reports) | |
elif report_type == 'finish': | |
reports = list(dt_reps_coll.find( | |
{'t_type': 'finish'}).sort('timestamp', -1).limit(20)) | |
for report in reports: | |
report['t_ts_submit'] = format_date(report['t_ts_submit']) | |
return render_template('reports_list.html', reports=reports) | |
elif report_type == 'temperatures': | |
reports = list(temps_coll.find( | |
{'company': user['company']}).sort('timestamp', -1).limit(40)) | |
for r in reports: | |
dt = r['timestamp'] | |
r['timestamp'] = format_date(dt) | |
return render_template('temps_report.html', reports=reports) | |
else: | |
# handle invalid report type | |
flash('Invalid report type.', 'success') | |
return render_template('error.html') | |
@app.route('/report/<report_id>') | |
def report(report_id): | |
report = dt_reps_coll.find_one({'_id': ObjectId(report_id)}) | |
answers = report['answers'] | |
date = format_date(report['t_ts_submit']) | |
return render_template( | |
'report_details.html', report=report, answers=answers, date=date) | |
@app.route('/delete_report/<report_id>') | |
def delete_report(report_id): | |
report = dt_reps_coll.find_one({'_id': ObjectId(report_id)}) | |
# delete report from the database: | |
try: | |
dt_reps_coll.delete_one({'_id': ObjectId(report_id)}) | |
# query the database if the report was deleted | |
deleted_report = users_coll.find_one( | |
{'_id': ObjectId(report_id)} | |
) | |
except Exception as e: | |
print(e) | |
deleted_report = False | |
if deleted_report: | |
flash('Report not deleted!', 'danger') | |
else: | |
flash('Report deleted successfully!', 'success') | |
print(report['t_type']) | |
return redirect(url_for('reports', report_type=report['t_type'])) | |
@app.route('/delete_temp/<temp_id>') | |
def delete_temp(temp_id): | |
# delete report from the database: | |
try: | |
temps_coll.delete_one({'_id': ObjectId(temp_id)}) | |
# query the database if the report was deleted | |
deleted_report = temps_coll.find_one( | |
{'_id': ObjectId(temp_id)} | |
) | |
except Exception as e: | |
print(e) | |
deleted_report = False | |
if deleted_report: | |
flash('Temperature Report not deleted!', 'danger') | |
else: | |
flash('Temperature Report deleted successfully!', 'success') | |
return redirect(url_for('reports', report_type='temperatures')) | |
if __name__ == '__main__': | |
app.run( | |
host=app.config['IP'], | |
port=int(app.config['PORT']), | |
debug=True | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{% extends "base.html" %} | |
{% block meta %} | |
Dashboard page | |
{% endblock %} | |
{% block title %}Dashboard - eManager{% endblock %} | |
{% block content %} | |
<div class="container col s12"> | |
{% with messages = get_flashed_messages() %} | |
{% if messages %} | |
{% for message in messages %} | |
<div id="flash_msg" class="row"> | |
<div class="col s12"> | |
<ul class="collection"> | |
<li class="collection-item avatar"> | |
<i class="fas fa-exclamation-triangle"></i> | |
<span class="title">Message:</span> | |
<p> | |
{{ message }} | |
</p> | |
<a href="#!" class="secondary-content"><i class="material-icons">grade</i></a> | |
</li> | |
</ul> | |
</div> | |
</div> | |
{% endfor %} | |
{% endif %} | |
{% endwith %} | |
</div> | |
<div class="row"> | |
<div class="col s12"> | |
<h3>Hi {{ session['user'].title() }}!</h3> | |
<h3>Welcome to eMANAGER {{ session['role'] }} dashboard</h3> | |
</div> | |
</div> | |
<div class="row"> | |
<div class="col s12"> | |
<div class="card blue-grey darken-1"> | |
<div class="card-content white-text"> | |
<span class="card-title">View Daily Tasks Reports</span> | |
<p>Lorem ipsum dolor sit amet consectetur adipisicing elit. Accusantium numquam placeat quis libero repellat ipsum debitis, ullam ex. Consectetur exercitationem consequatur odio rerum porro accusantium suscipit vel sequi excepturi delectus?</p> | |
<table class="responsive-table striped"> | |
<thead> | |
<tr> | |
<th>First Name</th> | |
<th>Last Name</th> | |
<th>Role</th> | |
</tr> | |
</thead> | |
<tbody> | |
{% for user in users %} | |
<tr> | |
<td>{{ user['fname'].title() }}</td> | |
<td>{{ user['lname'].title() }}</td> | |
<td>{{ user['role'].title() }}</td> | |
</tr> | |
{% endfor %} | |
</tbody> | |
</table> | |
</div> | |
<div class="card-action"> | |
<a href="#">Today's reports</a> | |
<a href="#">All reports</a> | |
</div> | |
</div> | |
</div> | |
<div class="col s12"> | |
<div class="card blue-grey darken-1"> | |
<div class="card-content white-text"> | |
<span class="card-title">Procedures</span> | |
<p>Lorem ipsum dolor sit amet consectetur adipisicing elit. Obcaecati error eveniet laudantium exercitationem rerum natus, consequatur odit ducimus officiis, nostrum iste cupiditate reiciendis quisquam ad aspernatur nemo ratione optio. Cumque?</p> | |
</div> | |
<div class="card-action"> | |
<a href="#">Add new procedure</a> | |
<a href="#">View all procedures</a> | |
</div> | |
</div> | |
</div> | |
<div class="col s12"> | |
<div class="card blue-grey darken-1"> | |
<div class="card-content white-text"> | |
<span class="card-title col s12 push-s4 pull-s2">View Employees</span> | |
<p>Lorem ipsum dolor sit amet consectetur adipisicing elit. Accusantium numquam placeat quis libero repellat ipsum debitis, ullam ex. Consectetur exercitationem consequatur odio rerum porro accusantium suscipit vel sequi excepturi delectus?</p> | |
</div> | |
<div class="card-action"> | |
<a href="#">Add new employee</a> | |
<a href="#">View all employees</a> | |
</div> | |
</div> | |
</div> | |
</div> | |
</div> | |
{% endblock %} | |
{% block javascript %} | |
<script> | |
$('#dashboard').toggleClass('current'); | |
</script> | |
{% endblock %} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pymongo.errors import ConnectionFailure | |
from datetime import datetime | |
# if default admin user is not in the DB, then set variable to False | |
default_admin_user_created = False | |
def check_sysadmin_user(): | |
""" | |
This function is called from the app.py file. | |
It checks if the default admin user is in the database. | |
If it is not, then it creates it. This is to ensure that has a default 'sysadmin' user, | |
which is the starting point of the application. | |
""" | |
global default_admin_user_created | |
if default_admin_user_created: | |
return | |
from app import users_coll | |
try: | |
# Check if 'sysadmin' user already exists in the database | |
sysadmin = users_coll.find_one({'un': 'sysadmin'}) | |
if sysadmin is not None: | |
return | |
# If 'sysadmin' user does not exist, create it with default password | |
hashed_password = generate_password_hash('1111') | |
new_user = { | |
'role': 'admin', | |
'un': 'sysadmin', | |
'pw': hashed_password, | |
'fname': 'System', | |
'lname': 'Admin', | |
'email': 'sysadmin@example.com', | |
'company': 'eManager' | |
} | |
users_coll.insert_one(new_user) | |
default_admin_user_created = True | |
except (ConnectionFailure, Exception) as e: | |
flash( | |
'Unable to connect to the database: ' + str(e), 'error') | |
def format_date(date_str): | |
""" | |
This functions is called each time there is a need to convert a date string from the database to a more readable format. | |
""" | |
date_obj = datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S') | |
return date_obj.strftime('%Y-%m-%d %H:%M:%S') | |
def process_form_data(form_data, dt, session_user): | |
""" | |
This function is called from the app.py file | |
when processing the reports form data submitted by the user. | |
""" | |
tasks = [] | |
task_list_db = [] | |
task_responses = {} | |
# get the form data: | |
ts_str = form_data['timestamp_tf'] | |
ts_obj = datetime.strptime(ts_str, '%Y-%m-%dT%H:%M') | |
iso_date = ts_obj.isoformat() | |
# get tasks index and user's choices and pair them in a dict: | |
for key in form_data.keys(): | |
if key.startswith('task_'): | |
task_number = int(key.split('_')[-1]) | |
tasks.append(form_data[f'task_{task_number}']) | |
task_responses[task_number] = form_data[key] | |
# get tasks values from DB and append them to the task_list_db list | |
for d in dt: | |
for t in d['tasks']: | |
task_list_db.append(t) | |
# create a dict with list containg the tasks and the user's choices | |
tasks_answers_pairs = { | |
str(key): [task_list_db[key-1], task_responses[key]] | |
for key in task_responses.keys()} | |
# get the form data: | |
observations = form_data.get('obs') | |
if observations is None: | |
observations = '' | |
c_shift_rep = { | |
't_type': dt[0]['t_type'], | |
't_ts_submit': iso_date, | |
't_rp_un': session_user.lower(), | |
'answers': tasks_answers_pairs, | |
't_obs': observations.lower(), | |
} | |
dt_reps_coll.insert_one(c_shift_rep) | |
return c_shift_rep | |
def should_edit_role( | |
restaurant_manager_count: int, | |
session_role: str, user_role: str) -> bool: | |
""" | |
Determines whether the user can edit the role of another user based on their session role, | |
the number of managers in the same restaurant, and the role of the user being edited. | |
Returns a boolean value. | |
""" | |
if session_role == 'manager': | |
if restaurant_manager_count <= 1 and user_role == 'manager': | |
can_edit = False | |
else: | |
can_edit = True | |
elif session_role == 'admin': | |
can_edit = True | |
else: | |
can_edit = False | |
return can_edit |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment