Skip to content

Instantly share code, notes, and snippets.

@miohtama
Created May 3, 2016 23:39
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save miohtama/69b5c365ec5e5ddd1d0b2ad2869460e8 to your computer and use it in GitHub Desktop.
Save miohtama/69b5c365ec5e5ddd1d0b2ad2869460e8 to your computer and use it in GitHub Desktop.
SMS authentication for Pyramid and Websauna
"""
allows users to sign up and login with their phone number, if that number is assoicated with an
existing customer record.
"""
import colander
from cornice.service import Service
from pyramid_sms.utils import normalize_us_phone_number
from websauna.system.user.events import UserCreated
from websauna.system.user.utils import get_login_service
from websauna.utils.time import now
from .. import path
from ..smslogin import prepare_phone_verification, verify_token, SMSTokenVerifyException
from ..models import User, get_customers_for_phone_number
@colander.deferred
def existing_customer_phone_number(node, kw):
request = kw['request']
def validator(node, phone_number):
phone_number = normalize_us_phone_number(phone_number)
customers = list(get_customers_for_phone_number(request.dbsession, phone_number))
if len(customers) == 0:
raise colander.Invalid(node, 'No customer with this phone number')
return validator
@colander.deferred
def validate_token(node, kw):
request = kw['request']
def validator(node, token):
if not token.isnumeric():
raise colander.Invalid(node, 'malformed verification token (must be numeric)')
if len(token) != 4:
raise colander.Invalid(node, 'malformed verification token')
def exception_factory(msg):
raise colander.Invalid(node, msg)
try:
request.validated['phone_number'] = verify_token(request, token)
except SMSTokenVerifyException as exc:
raise colander.Invalid(node, exc.msg)
return validator
class LoginInitSchema(colander.MappingSchema):
phone_number = colander.SchemaNode(
colander.String(),
validator=existing_customer_phone_number
)
class LoginVerifySchema(colander.MappingSchema):
token = colander.SchemaNode(
colander.String(),
validator=validate_token
)
login = Service(
name='restlogin',
path=path('login'),
renderer='json',
accept='application/json')
@login.post(require_csrf=False, schema=LoginInitSchema())
def post_login(request):
"""
Manual testing.
:param request:
:return:
"""
prepare_phone_verification(request, request.validated["phone_number"])
return dict(status='success')
@login.put(require_csrf=False, schema=LoginVerifySchema())
def verify_login(request):
phone_number = request.validated['phone_number']
customer = list(get_customers_for_phone_number(request.dbsession, phone_number))[0]
# create a user instance for this customer
user = User(
username=phone_number,
registration_source="smslogin",
activated_at=now()
)
request.dbsession.add(user)
request.dbsession.flush()
request.registry.notify(UserCreated(request, user))
customer.users.append(user)
# and log the user in
login_service = get_login_service(request)
login_service.authenticate_user(user, "")
return dict(status='success')
import random
import logging
import json
import time
from pyramid.httpexceptions import HTTPFound
from pyramid.response import Response
from pyramid_sms.utils import normalize_us_phone_number
from pyramid_sms.outgoing import send_templated_sms
from websauna.system.core.redis import get_redis
from websauna.system.http import Request
LOGIN_VERIFICATION_REDIS_HKEY = "login_verification_token"
logger = logging.getLogger(__name__)
def set_verification_token(request, token_type, phone_number, next_url=None):
redis = get_redis(request.registry)
email_token_expiration_time = int(request.registry.settings.get("smslogin.email_token_expiration_time", 300))
token = 10000000 + random.randint(0, 80000000)
expires = time.time() + email_token_expiration_time
if token_type == "phone_number":
manual_code = 1000 + random.randint(0, 8999)
else:
manual_code = None
data = {
"token_type": token_type,
"expires": expires,
"token": token,
"next_url": next_url,
"phone_number": phone_number,
"manual_code": manual_code,
}
request.session["phone_number"] = phone_number
redis.hset(LOGIN_VERIFICATION_REDIS_HKEY, phone_number, json.dumps(data))
return token, data
def prepare_phone_verification(request: Request, phone_number: str, next_url=None) -> Response:
"""
:param request:
:param phone_number:
:param next_url: Optional recorded next URL where the user should land after login.
"""
phone_number = normalize_us_phone_number(phone_number)
token, data = set_verification_token(request, "phone_number", phone_number, next_url=next_url)
context = dict(manual_code=data["manual_code"])
send_templated_sms(request, "login/sms/phone_verification.txt", context, phone_number,)
class SMSTokenVerifyException(Exception):
def __init__(self, msg):
self.msg = msg
def verify_token(request: Request, token: str):
"""Verify SMS login token.
:param request:
:param token: Code entered by the user
:param exception: Use this factory to produce an exception if validation fails with an error message.
:return: Verified phone number
"""
redis = get_redis(request.registry)
phone_number = request.session.get("phone_number")
if not phone_number:
raise SMSTokenVerifyException("Please enter phone number first.")
token_data = redis.hget(LOGIN_VERIFICATION_REDIS_HKEY, phone_number)
if not token_data:
raise SMSTokenVerifyException("No SMS login in process for current user.")
# Allow use the code only once, then erase
redis.hdel(LOGIN_VERIFICATION_REDIS_HKEY, token)
data = json.loads(token_data.decode("utf-8"))
# Only phone number tokens in this view
if data["token_type"] != "phone_number":
raise SMSTokenVerifyException('token type mismatch, must be phone, was {token_type}'.format(**data))
if time.time() > data["expires"]:
raise SMSTokenVerifyException("Login link expired. Please try again.")
if str(data["manual_code"]) != token:
raise SMSTokenVerifyException("The verification code did not match. Please try again.")
phone_number = data["phone_number"]
return phone_number
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment