Created
May 3, 2016 23:39
-
-
Save miohtama/69b5c365ec5e5ddd1d0b2ad2869460e8 to your computer and use it in GitHub Desktop.
SMS authentication for Pyramid and Websauna
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
allows users to sign up and login with their phone number, if that number is assoicated with an | |
existing customer record. | |
""" | |
import colander | |
from cornice.service import Service | |
from pyramid_sms.utils import normalize_us_phone_number | |
from websauna.system.user.events import UserCreated | |
from websauna.system.user.utils import get_login_service | |
from websauna.utils.time import now | |
from .. import path | |
from ..smslogin import prepare_phone_verification, verify_token, SMSTokenVerifyException | |
from ..models import User, get_customers_for_phone_number | |
@colander.deferred | |
def existing_customer_phone_number(node, kw): | |
request = kw['request'] | |
def validator(node, phone_number): | |
phone_number = normalize_us_phone_number(phone_number) | |
customers = list(get_customers_for_phone_number(request.dbsession, phone_number)) | |
if len(customers) == 0: | |
raise colander.Invalid(node, 'No customer with this phone number') | |
return validator | |
@colander.deferred | |
def validate_token(node, kw): | |
request = kw['request'] | |
def validator(node, token): | |
if not token.isnumeric(): | |
raise colander.Invalid(node, 'malformed verification token (must be numeric)') | |
if len(token) != 4: | |
raise colander.Invalid(node, 'malformed verification token') | |
def exception_factory(msg): | |
raise colander.Invalid(node, msg) | |
try: | |
request.validated['phone_number'] = verify_token(request, token) | |
except SMSTokenVerifyException as exc: | |
raise colander.Invalid(node, exc.msg) | |
return validator | |
class LoginInitSchema(colander.MappingSchema): | |
phone_number = colander.SchemaNode( | |
colander.String(), | |
validator=existing_customer_phone_number | |
) | |
class LoginVerifySchema(colander.MappingSchema): | |
token = colander.SchemaNode( | |
colander.String(), | |
validator=validate_token | |
) | |
login = Service( | |
name='restlogin', | |
path=path('login'), | |
renderer='json', | |
accept='application/json') | |
@login.post(require_csrf=False, schema=LoginInitSchema()) | |
def post_login(request): | |
""" | |
Manual testing. | |
:param request: | |
:return: | |
""" | |
prepare_phone_verification(request, request.validated["phone_number"]) | |
return dict(status='success') | |
@login.put(require_csrf=False, schema=LoginVerifySchema()) | |
def verify_login(request): | |
phone_number = request.validated['phone_number'] | |
customer = list(get_customers_for_phone_number(request.dbsession, phone_number))[0] | |
# create a user instance for this customer | |
user = User( | |
username=phone_number, | |
registration_source="smslogin", | |
activated_at=now() | |
) | |
request.dbsession.add(user) | |
request.dbsession.flush() | |
request.registry.notify(UserCreated(request, user)) | |
customer.users.append(user) | |
# and log the user in | |
login_service = get_login_service(request) | |
login_service.authenticate_user(user, "") | |
return dict(status='success') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import logging | |
import json | |
import time | |
from pyramid.httpexceptions import HTTPFound | |
from pyramid.response import Response | |
from pyramid_sms.utils import normalize_us_phone_number | |
from pyramid_sms.outgoing import send_templated_sms | |
from websauna.system.core.redis import get_redis | |
from websauna.system.http import Request | |
LOGIN_VERIFICATION_REDIS_HKEY = "login_verification_token" | |
logger = logging.getLogger(__name__) | |
def set_verification_token(request, token_type, phone_number, next_url=None): | |
redis = get_redis(request.registry) | |
email_token_expiration_time = int(request.registry.settings.get("smslogin.email_token_expiration_time", 300)) | |
token = 10000000 + random.randint(0, 80000000) | |
expires = time.time() + email_token_expiration_time | |
if token_type == "phone_number": | |
manual_code = 1000 + random.randint(0, 8999) | |
else: | |
manual_code = None | |
data = { | |
"token_type": token_type, | |
"expires": expires, | |
"token": token, | |
"next_url": next_url, | |
"phone_number": phone_number, | |
"manual_code": manual_code, | |
} | |
request.session["phone_number"] = phone_number | |
redis.hset(LOGIN_VERIFICATION_REDIS_HKEY, phone_number, json.dumps(data)) | |
return token, data | |
def prepare_phone_verification(request: Request, phone_number: str, next_url=None) -> Response: | |
""" | |
:param request: | |
:param phone_number: | |
:param next_url: Optional recorded next URL where the user should land after login. | |
""" | |
phone_number = normalize_us_phone_number(phone_number) | |
token, data = set_verification_token(request, "phone_number", phone_number, next_url=next_url) | |
context = dict(manual_code=data["manual_code"]) | |
send_templated_sms(request, "login/sms/phone_verification.txt", context, phone_number,) | |
class SMSTokenVerifyException(Exception): | |
def __init__(self, msg): | |
self.msg = msg | |
def verify_token(request: Request, token: str): | |
"""Verify SMS login token. | |
:param request: | |
:param token: Code entered by the user | |
:param exception: Use this factory to produce an exception if validation fails with an error message. | |
:return: Verified phone number | |
""" | |
redis = get_redis(request.registry) | |
phone_number = request.session.get("phone_number") | |
if not phone_number: | |
raise SMSTokenVerifyException("Please enter phone number first.") | |
token_data = redis.hget(LOGIN_VERIFICATION_REDIS_HKEY, phone_number) | |
if not token_data: | |
raise SMSTokenVerifyException("No SMS login in process for current user.") | |
# Allow use the code only once, then erase | |
redis.hdel(LOGIN_VERIFICATION_REDIS_HKEY, token) | |
data = json.loads(token_data.decode("utf-8")) | |
# Only phone number tokens in this view | |
if data["token_type"] != "phone_number": | |
raise SMSTokenVerifyException('token type mismatch, must be phone, was {token_type}'.format(**data)) | |
if time.time() > data["expires"]: | |
raise SMSTokenVerifyException("Login link expired. Please try again.") | |
if str(data["manual_code"]) != token: | |
raise SMSTokenVerifyException("The verification code did not match. Please try again.") | |
phone_number = data["phone_number"] | |
return phone_number | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment