View create_user_faked_hash.py
def create_user(db: Session, user: schemas.UserCreate): | |
fake_hashed_password = user.password + "notreallyhashed" | |
db_user = models.UserInfo(username=user.username, password=fake_hashed_password, fullname=user.fullname) | |
db.add(db_user) | |
db.commit() | |
db.refresh(db_user) | |
return db_user |
View create_user_hash_password_bcrypt.py
def create_user(db: Session, user: schemas.UserCreate): | |
hashed_password = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt()) | |
db_user = models.UserInfo(username=user.username, password=hashed_password, fullname=user.fullname) | |
db.add(db_user) | |
db.commit() | |
db.refresh(db_user) | |
return db_user |
View bcrypt_check_password.py
def check_username_password(db: Session, user: schemas.UserAuthenticate): | |
db_user_info: models.UserInfo = get_user_by_username(db, username=user.username) | |
return bcrypt.checkpw(user.password.encode('utf-8'), db_user_info.password.encode('utf-8')) |
View token_schema.py
class Token(BaseModel): | |
access_token: str | |
token_type: str |
View access_token_method.py
def create_access_token(*, data: dict, expires_delta: timedelta = None): | |
secret_key = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" | |
algorithm = "HS256" | |
to_encode = data.copy() | |
if expires_delta: | |
expire = datetime.utcnow() + expires_delta | |
else: | |
expire = datetime.utcnow() + timedelta(minutes=15) | |
to_encode.update({"exp": expire}) | |
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm) |
View authenticate_api.py
@app.post("/authenticate", response_model=schemas.Token) | |
def authenticate_user(user: schemas.UserAuthenticate, db: Session = Depends(get_db)): | |
db_user = crud.get_user_by_username(db, username=user.username) | |
if db_user is None: | |
raise HTTPException(status_code=400, detail="Username not existed") | |
else: | |
is_password_correct = crud.check_username_password(db, user) | |
if is_password_correct is False: | |
raise HTTPException(status_code=400, detail="Password is not correct") | |
else: |
View database.py
from sqlalchemy import create_engine | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import sessionmaker | |
SQLALCHEMY_DATABASE_URL = "mysql+mysqlconnector://root:cuong1990@localhost:3306/restapi" | |
engine = create_engine( | |
SQLALCHEMY_DATABASE_URL, | |
) | |
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) |
View crud.py
from sqlalchemy.orm import Session | |
from . import models, schemas | |
import bcrypt | |
def get_user_by_username(db: Session, username: str): | |
return db.query(models.UserInfo).filter(models.UserInfo.username == username).first() | |
def create_user(db: Session, user: schemas.UserCreate): |
View models.py
from sqlalchemy import Column, Integer, String | |
from sql_app.database import Base | |
class UserInfo(Base): | |
__tablename__ = "user_info" | |
id = Column(Integer, primary_key=True, index=True) | |
username = Column(String, unique=True) | |
password = Column(String) |
View schemas.py
from typing import List | |
from pydantic import BaseModel | |
class UserInfoBase(BaseModel): | |
username: str | |
class UserCreate(UserInfoBase): | |
fullname: str |
OlderNewer