Created
December 21, 2019 03:36
-
-
Save thiagocom/79933cbad7beedf2620d3545798a95bc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# FLASK | |
from functools import wraps | |
from flask import Flask, redirect, url_for, request, session, g, render_template_string, flash | |
from werkzeug.security import generate_password_hash, check_password_hash | |
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String | |
engine = create_engine("sqlite:///app.db", echo=False) | |
metadata = MetaData(bind=engine) | |
users_table = Table( | |
"users", | |
metadata, | |
Column("id", Integer, primary_key=True), | |
Column("username", String, nullable=False, index=True), | |
Column("password", String, nullable=False) | |
) | |
metadata.create_all() | |
app = Flask(__name__) | |
app.config.update({"SECRET_KEY": "S3CR3T_K3Y"}) | |
class ValidationError(Exception): | |
def __init__(self, message): | |
self.message = message | |
def __str__(self): | |
return self.message | |
class Schema(object): | |
def __init__(self, validators=None): | |
if not isinstance(validators, (list, tuple)): | |
raise TypeError("validators required list of validators") | |
self._validators = validators | |
self.errors = [] | |
def validate(self, value): | |
for v in self._validators: | |
try: | |
v.validate(value) | |
except ValidationError as err: | |
self.errors.append(err.message) | |
return self.errors | |
class Validator(object): | |
def __init__(self, message=None): | |
if message: | |
self.message = message | |
def validate(self, data): | |
pass | |
class Required(Validator): | |
message = "this field is required" | |
def validate(self, data): | |
if not data: | |
raise ValidationError(self.message) | |
class IntegerValidator(Validator): | |
def __init__(self, value, message=None): | |
if not isinstance(value, int): | |
raise TypeError("value parameter is not integer") | |
self._value = value | |
if not message: | |
message = self.message.format(value) | |
super().__init__(message) | |
class Min(IntegerValidator): | |
message = "this field required min {}" | |
def validate(self, data): | |
if (isinstance(data, int) and data > self._value) or (isinstance(data, str) and len(data) < self._value): | |
raise ValidationError(self.message) | |
class Max(IntegerValidator): | |
message = "this field required max {}" | |
def validate(self, data): | |
if (isinstance(data, int) and data < self._value) or (isinstance(data, str) and len(data) > self._value): | |
raise ValidationError(self.message) | |
class AnonymousUser(object): | |
@property | |
def is_authenticated(self): | |
return False | |
@property | |
def is_annonymous(self): | |
return True | |
class ProxyUser(object): | |
def __init__(self, inner): | |
self._inner = inner | |
def __getattr__(self, attr): | |
if attr == "is_authenticated": | |
return True | |
if attr == "is_annonymous": | |
return False | |
return getattr(self._inner, attr) | |
@app.context_processor | |
def inject_user(): | |
return dict(current_user=login_manager.current_user) | |
class LoginManager(object): | |
login_view = "login" | |
def __init__(self, app=None): | |
self._app = app | |
def init_app(self, app): | |
self._app = app | |
def user_loader(self, fn): | |
self._user_loader = fn | |
@property | |
def current_user(self): | |
try: | |
user_id = session["user_id"] | |
except KeyError as e: | |
return AnonymousUser() | |
return ProxyUser(self._user_loader(user_id)) | |
login_manager = LoginManager(app) | |
@login_manager.user_loader | |
def load_user(user_id): | |
connection = engine.connect() | |
result = connection.execute(users_table.select().where(users_table.c.id==user_id)).fetchone() | |
connection.close() | |
return result | |
def check_password(pass_hash, password): | |
return check_password_hash(pass_hash, password) | |
def generate_password(password): | |
return generate_password(password) | |
def login_required(route): | |
@wraps(route) | |
def inner(*args, **kwargs): | |
user_id = session.get("user_id") | |
if user_id: | |
return route(*args, **kwargs) | |
return redirect(url_for(login_manager.login_view)) | |
return inner | |
def login_user(user): | |
session["user_id"] = user.id | |
g.user = login_manager.current_user | |
def logout_user(): | |
session.pop("user_id", None) | |
g.user = None | |
def insert_css(route): | |
@wraps(route) | |
def inner(*args, **kwargs): | |
result = route(*args, **kwargs) | |
if not isinstance(result, str): | |
return result | |
head = """ | |
<head> | |
<link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Bree+Serif&display=swap"> | |
<link rel="stylesheet" href="https://bootswatch.com/4/minty/bootstrap.min.css" /> | |
<style> | |
* { | |
font-family: "Bree Serif", serif !important; | |
} | |
</style> | |
</head> | |
""" | |
body = """ | |
<body class='container mt-5 text-center'> | |
{} | |
<script src="https://code.jquery.com/jquery-3.4.1.slim.min.js"></script> | |
<script src="https://cdn.jsdelivr.net/npm/popper.js@1.16.0/dist/umd/popper.min.js"></script> | |
<script src="https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/js/bootstrap.min.js"></script> | |
</body> | |
""".format(result) | |
return head + body | |
return inner | |
@app.route("/") | |
@login_required | |
@insert_css | |
def index(): | |
return render_template_string("<h1>Welcome to development world, {{ current_user.username|capitalize }}</h1>") | |
def welcome(name): | |
return "<h1>Welcome, {}</h1>".format(name.capitalize()) | |
app.add_url_rule("/welcome/<name>", "welcome", insert_css(welcome)) | |
@app.route("/annonymous") | |
@insert_css | |
def annonymous(): | |
return "<h1>Hello, annonymous</h1>" | |
@app.route("/admin") | |
@insert_css | |
def admin(): | |
return "<h1>Hello, admin</h1>" | |
@app.route("/user/<type>") | |
def guest(type): | |
if type == "admin": | |
return redirect(url_for("admin")) | |
return redirect(url_for("annonymous")) | |
@app.route("/login", methods=["GET", "POST"]) | |
@insert_css | |
def login(): | |
if not login_manager.current_user.is_authenticated and request.method == "POST": | |
username = request.form.get("username") | |
password = request.form.get("password") | |
username_errors = Schema([Required(), Min(1), Max(4)]).validate(username) | |
password_errors = Schema([Required(), Min(1), Max(3)]).validate(password) | |
if not username_errors and not password_errors: | |
connection = engine.connect() | |
user = connection.execute(users_table.select().where(username==username)).fetchone() | |
if user: | |
connection.close() | |
if check_password(user.password, password): | |
login_user(user) | |
return redirect(url_for("index")) | |
return render_template_string(""" | |
<form action="" method="post"> | |
<div class="form-group"> | |
<input class="form-control" type="text" name="username" placeholder="Your username" /> | |
</div> | |
<div class="form-group"> | |
<input class="form-control" type="password" name="password" placeholder="Your password" /> | |
</div> | |
<button class="btn btn-block btn-outline-success" type="submit">Sign in</button> | |
</form> | |
<div class="text-center"> | |
<p class="text-muted">Don't have account? <a href="{{ url_for('register') }}">Sign up</a> | |
</div> | |
""") | |
@app.route("/register", methods=["GET", "POST"]) | |
@insert_css | |
def register(): | |
if not login_manager.current_user.is_authenticated and request.method == "POST": | |
username = request.form.get("username") | |
password = request.form.get("password") | |
if username and password: | |
connection = engine.connect() | |
if not connection.execute(users_table.select().where(username==username)).fetchone(): | |
pass_hash = generate_password_hash(password) | |
user = connection.execute(users_table.insert().values(username=username, password=pass_hash)) | |
connection.close() | |
return redirect(url_for("login")) | |
return render_template_string(""" | |
<form action="" method="post"> | |
<div class="form-group"> | |
<input class="form-control" type="text" name="username" placeholder="Your username" /> | |
</div> | |
<div class="form-group"> | |
<input class="form-control" type="password" name="password" placeholder="Your password" /> | |
</div> | |
<button class="btn btn-block btn-outline-success" type="submit">Sign up</button> | |
</form> | |
<div class="text-center"> | |
<p class="text-muted">You have account? <a href="{{ url_for('login') }}">Sign in</a> | |
</div> | |
""") | |
@app.route("/logout") | |
@login_required | |
def logout(): | |
logout_user() | |
return redirect(url_for("login")) | |
if __name__ == "__main__": | |
app.run(port=8080, debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment