Skip to content

Instantly share code, notes, and snippets.

@faustomilletari
Created April 26, 2020 17:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save faustomilletari/2112e8551cee401cb8b36e7d07d34b4d to your computer and use it in GitHub Desktop.
Save faustomilletari/2112e8551cee401cb8b36e7d07d34b4d to your computer and use it in GitHub Desktop.
Database class for Lambda function (Medium post)
import hashlib, binascii, os
from sqlalchemy import create_engine
from sqlalchemy import Table, Column, String, MetaData, select, and_
def hash_passwd(passwd):
"""Hash a password for storing."""
salt = hashlib.sha256(os.urandom(60)).hexdigest().encode('ascii')
pwdhash = hashlib.pbkdf2_hmac('sha512', passwd.encode('utf-8'), salt, 100000)
pwdhash = binascii.hexlify(pwdhash)
return pwdhash.decode('ascii'), salt.decode('ascii')
def hash_passwd_with_salt(passwd, salt):
"""Verify a stored password against one provided by user"""
pwdhash = hashlib.pbkdf2_hmac('sha512', passwd.encode('utf-8'), salt.encode('ascii'), 100000)
pwdhash = binascii.hexlify(pwdhash).decode('ascii')
return pwdhash
class DB:
def __init__(self, user, password, host, port):
self.user = user
self.password = password
self.host = host
self.port = port
db_string = "postgres://{}:{}@{}:{}".format(
self.user,
self.password,
self.host,
self.port
)
self.db = create_engine(db_string)
self.meta = MetaData(self.db)
self.user_table = Table(
'users', self.meta,
Column('email', String, primary_key=True),
Column('name', String),
Column('surname', String),
Column('password', String),
Column('salt', String)
)
def create_tables(self):
self.meta.create_all(self.db)
return True
def insert_new_user(self, user):
key, salt = hash_passwd(user['password'])
with self.db.connect() as conn:
insert_statement = self.user_table.insert().values(
email=user['email'],
name=user['name'],
surname=user['surname'],
password=key,
salt=salt
)
conn.execute(insert_statement)
return True
def login_user(self, email, password):
select_st = select([self.user_table]).where(self.user_table.c.email == email)
with self.db.connect() as conn:
row = conn.execute(select_st).first()
if row is None:
return False
key = hash_passwd_with_salt(password, row.salt)
if key != row.password:
return False
# subject to JWT token release after this call
return True
def remove_user(self, email):
select_st = select([self.user_table]).where(self.user_table.c.email == email)
with self.db.connect() as conn:
result = conn.execute(select_st).first()
if result is None:
return False
del_st = self.user_table.delete().where(self.user_table.c.email == email)
with self.db.connect() as conn:
conn.execute(del_st)
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment