SMS based login for React and Websauna app
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
allows users to sign up and login with their phone number, if that number is assoicated with an | |
existing customer record. | |
""" | |
import logging | |
import colander | |
from cornice.service import Service | |
from pyramid.exceptions import HTTPBadRequest | |
from pyramid_sms.utils import normalize_us_phone_number | |
from .user import serialize_user | |
from websauna.system.user.events import UserCreated | |
from websauna.system.user.utils import get_login_service | |
from websauna.utils.time import now | |
from ..smslogin import prepare_phone_verification, verify_token, SMSTokenVerifyException | |
from ..models import User, get_customers_for_phone_number | |
logger = logging.getLogger(__name__) | |
def path(service): | |
""" Return path — or route pattern — for the given REST service so it can be easily discriminated in Nginx. """ | |
return '/-/{0}'.format(service.lower()) | |
@colander.deferred | |
def existing_customer_phone_number(node, kw): | |
request = kw['request'] | |
def validator(node, phone_number): | |
phone_number = normalize_us_phone_number(phone_number) | |
customers = list(get_customers_for_phone_number(request.dbsession, phone_number)) | |
if len(customers) == 0: | |
raise colander.Invalid(node, 'No customer with this phone number') | |
return validator | |
@colander.deferred | |
def validate_token(node, kw): | |
request = kw['request'] | |
def validator(node, token): | |
token = token.strip() | |
if not token.isnumeric(): | |
raise colander.Invalid(node, 'malformed verification token (must be numeric)') | |
if len(token) != 4: | |
raise colander.Invalid(node, 'malformed verification token') | |
def exception_factory(msg): | |
raise colander.Invalid(node, msg) | |
try: | |
request.validated['phone_number'] = verify_token(request, token) | |
except SMSTokenVerifyException as exc: | |
raise colander.Invalid(node, exc.args[0]) | |
return validator | |
class LoginInitSchema(colander.MappingSchema): | |
phone_number = colander.SchemaNode( | |
colander.String(), | |
validator=existing_customer_phone_number | |
) | |
class LoginVerifySchema(colander.MappingSchema): | |
token = colander.SchemaNode( | |
colander.String(), | |
validator=validate_token | |
) | |
login = Service( | |
name='restlogin', | |
path=path('login'), | |
renderer='json', | |
accept='application/json') | |
@login.post(require_csrf=False, schema=LoginInitSchema()) | |
def post_login(request): | |
""" | |
Manual testing. | |
:param request: | |
:return: | |
""" | |
phone_number = request.validated["phone_number"] | |
customers = list(get_customers_for_phone_number(request.dbsession, phone_number)) | |
invalid_customer = True | |
for customer in customers: | |
if customer.organization.id == request.organization.id: | |
invalid_customer = False | |
break | |
if invalid_customer: | |
raise HTTPBadRequest('No customer registered with this phone number for this organizaation') | |
prepare_phone_verification(request, phone_number) | |
return dict(status='success') | |
@login.put(require_csrf=False, schema=LoginVerifySchema()) | |
def verify_login(request): | |
phone_number = request.validated['phone_number'] | |
customers = list(get_customers_for_phone_number(request.dbsession, phone_number)) | |
if len(customers) == 0: | |
raise HTTPBadRequest('No customer registered with this phone number') | |
# TODO: handle users that are customers at multiple organizations | |
customer = customers[0] | |
logger.debug("Starting login for phone number %s", phone_number) | |
if customer.users.count() > 0: | |
user = customer.users.first() | |
else: | |
user = User( | |
username=phone_number, | |
registration_source="sms_login", | |
email=customer.external_data.get('email'), | |
activated_at=now() | |
) | |
customer.users.append(user) | |
request.dbsession.flush() | |
request.registry.notify(UserCreated(request, user)) | |
if not user.full_name: | |
user.full_name = customer.name | |
# and log the user in | |
login_service = get_login_service(request) | |
login_service.authenticate_user(user, "sms_login") | |
logger.info("Login success %s", user) | |
return serialize_user(user) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment