Skip to content

Instantly share code, notes, and snippets.

@miohtama
Created August 23, 2015 13:41
Show Gist options
  • Save miohtama/e85ef31110ce2e0d7c38 to your computer and use it in GitHub Desktop.
Save miohtama/e85ef31110ce2e0d7c38 to your computer and use it in GitHub Desktop.
PrestoDoctor OAuth implementation with Authomatic, Pyramid and Python 3.x
"""Prestodoctor OAuth on Authomatic implementation with data import and mapping to internal database."""
from argparse import Namespace
import time
from authomatic.core import json_qs_parser
from authomatic.providers.oauth2 import OAuth2
from websauna.system.model import now
from websauna.system.user.social import EmailSocialLoginMapper, \
NotSatisfiedWithData
from trees.models import UserMedia
__author__ = "Mikko Ohtamaa <mikko@opensourcehacker.com>"
__license__ = "AGPL"
class PrestodoctorAuthomatic(OAuth2):
"""Prestodoctor Authomatic OAuth2 implementation.
* Docs: https://github.com/PrestoDoctor/omniauth-prestodoctor/blob/master/lib/omniauth/strategies/prestodoctor.rb
"""
user_authorization_url = 'https://prestodoctor.com/oauth/authorize'
access_token_url = 'https://prestodoctor.com/oauth/token'
user_info_url = 'https://prestodoctor.com/api/v1/user'
info_base_url = "https://prestodoctor.com/api/v1/user"
#: Comes from config file
user_info_scope = []
def _x_scope_parser(self, scope):
"""Space separated scopes"""
return 'user_info recommendation photo_id'
def _update_or_create_user(self, data, credentials=None, content=None):
"""Fetch user info from Prestodoctor specific endpoints."""
super(PrestodoctorAuthomatic, self)._update_or_create_user(data, credentials, content)
self.user.base_data = self.access(self.info_base_url, content_parser=json_qs_parser).data
# Recommendation data might be empty if the user has not done medical evaluation yet
self.user.recommendation_data = self.access(self.info_base_url + "/recommendation", content_parser=json_qs_parser).data or {}
self.user.photo_data = self.access(self.info_base_url + "/photo_id", content_parser=json_qs_parser).data or {}
return self.user
class PrestodoctorMapper(EmailSocialLoginMapper):
"""Map Prestodoctor external users to our database."""
def import_social_media_user(self, user):
"""Convert incoming Authomatic object to info dictionary."""
# Merge all separate data sources to a single dictionary
info = user.base_data.copy()
info["photo_id"] = user.photo_data.copy()
info["recommendation"] = user.recommendation_data.copy()
return info
def update_first_login_social_data(self, user, data):
"""Update user full name on the first login only."""
super(PrestodoctorMapper, self).update_first_login_social_data(user, data)
user.full_name = data["first_name"] + " " + data["last_name"]
def update_full_presto_data(self, user, info:Namespace):
"""Download copy of Prestodoctor photo files to local server.
Set user's medical license verified if it looks good.
"""
user.license_initial_upload_completed_at = now()
# Trust Prestodoctor licenses if they are not expired
if info.recommendation.expires > time.time():
user.license_verified_by = None
user.license_verified_at = now()
user.presto_license_number = info.recommendation.id_num
user.medical_license_upload_completed_at = now()
user.license_initial_upload_completed_at = now()
# Download copy of government issued id so the driver can check this is the right person
driving_license = UserMedia.fetch_from_url(self.registry, info.photo_id.url, user=user)
driving_license.approved_by = None
driving_license.approved_at = now()
driving_license.store_bbb_copy(self.registry, "driving") # Backwards compatibility
# Set a marker for tests so we know we don't do this operation twice
user.user_data["social"]["prestodoctor"]["full_data_updated_at"] = now().isoformat()
def update_every_login_social_data(self, user, data):
"""Update user data every time they relogin."""
# If the user has been using our system before, get the current active prestodoctor recommendation issued date
last_known_recommendation_issued = user.user_data["social"].get("prestodoctor", {}).get("recommendation", {}).get("issued", None)
super(PrestodoctorMapper, self).update_every_login_social_data(user, data)
# Convert data to dotted notation for saving wrists below
# http://stackoverflow.com/a/16279578/315168
info = Namespace(**data)
info.address = Namespace(**info.address)
info.photo_id = Namespace(**info.photo_id)
info.recommendation = Namespace(**info.recommendation)
# Map Presto fields to our fields
mappings = {
"dob": info.dob,
"photo_url": info.photo,
"country": "US",
"zipcode": info.address.zip5,
"zip4": info.address.zip4,
"gender": None,
"first_name": info.first_name,
"last_name": info.last_name,
"full_name": info.first_name + " " + info.last_name,
"city": info.address.city,
"state": info.address.state,
"postal_code": info.address.zip5,
"address": info.address.address1,
"apartment": info.address.address2,
"external_data_updated": now().isoformat()
}
# TODO: Set user medical license verified if Presto gives us its number
# Update internal structure. Only override existing value if we have data from Presto.
# E.g. phone number might be missing, but we have it, so we don't want to replace existing phone number with empty string.
for key, value in mappings.items():
if value:
user.user_data[key] = value
if user.user_data["social"]["prestodoctor"]["recommendation"].get("issued") != last_known_recommendation_issued:
# The prestodoctor evaluation issue has changed, do the heavy data update
self.update_full_presto_data(user, info)
def capture_social_media_user(self, request, result):
"""Extract social media information from the Authomatic login result in order to associate the user account."""
# Should not happen
assert not result.error
email = result.user.base_data.get("email")
if not email:
# We cannot login if the Facebook doesnt' give us email as we use it for the user mapping
# This can also happen when you have not configured Facebook app properly in the developers.facebook.com
raise NotSatisfiedWithData("Email address is needed in order to user this service and we could not get one from your social media provider. Please try to sign up with your email instead.")
user = self.get_or_create_user_by_social_medial_email(request, result.user)
return user
#: This map is to satisfy Authomatic module loader
PROVIDER_ID_MAP = [PrestodoctorAuthomatic]
import pytest
import transaction
from trees.tests.utils import start_order, fill_in_delivery_details, \
confirm_delivery
from trees.usermodels import User
PRESTO_USER_WITH_RECOMMENDATION = ""
PRESTO_USER_WITHOUT_RECOMMENDATION = ""
PRESTO_PASSWORD = ""
PHONE_NUMBER = ""
@pytest.fixture
def non_evaluated_user_browser(request, browser_instance_getter):
"""Selenium/slinter/pytest-splinter does not properly clean the browser between tests.
https://github.com/pytest-dev/pytest-splinter/issues/49
"""
return browser_instance_getter(request, non_evaluated_user_browser)
def do_presto_login(browser, presto_user=PRESTO_USER_WITH_RECOMMENDATION, presto_password=PRESTO_PASSWORD):
"""This will cause an alert in your Presto login management which you need to clear later."""
b = browser
assert b.is_text_present("Sign in to your account")
# Fill in Presto login page
b.fill("user[email]", presto_user)
b.fill("user[password]", presto_password)
b.find_by_css("input[name='commit']").click()
# First time login pops up allow permission dialog.
if b.is_text_present("Authorization required"):
b.find_by_css("input[name='commit']").click()
def do_presto_login_if_needed(browser, presto_user=PRESTO_USER_WITH_RECOMMENDATION, presto_password=PRESTO_PASSWORD):
"""For subsequent tests the Prestodoctor auth keys remain activate and we don't need to enter username and password again.
Redirecting through OAuth provider endpoint is enough."""
if browser.is_text_present("Sign in to your account"):
do_presto_login(browser, presto_user, presto_password)
def test_presto_login(web_server, browser, DBSession, init):
"""Login / sign up with Prestodoctor.
Presto application must be configurd as web application, running in http://localhost:8521/.
Example invocation: PRESTO_USER="040xxxXXXX" PRESTO_PASSWORD="yyyy" py.test trees -s --splinter-webdriver=firefox --splinter-make-screenshot-on-failure=false --ini=test.ini -k test_facebook_login
:param web_server: Py.text fixture, gives HTTP address where the functional test web server is running, ``http://localhost:8521/``
:param browser: Py.test Splinter Browser fixture
:param DBSession: Py.test SQLALchemy session
:param init: Websauna configuration object
"""
b = browser
# Initiate Presto login with Authomatic
b.visit("{}/login".format(web_server))
b.find_by_css(".btn-login-prestodoctor").click()
do_presto_login_if_needed(b)
assert b.is_text_present("You are now logged in")
# See that we got somewhat sane data
with transaction.manager:
assert DBSession.query(User).count() == 1
u = DBSession.query(User).get(1)
assert u.first_login
assert u.email == PRESTO_USER_WITH_RECOMMENDATION
assert u.activated_at
assert u.last_login_ip == "127.0.0.1"
# Check user basic data
assert u.full_name == 'Test Oauth1'
assert u.user_data["social"]["prestodoctor"]["dob"] == -621648001
assert u.address == "123 MARKET ST"
assert u.city == "SAN FRANCISCO"
assert u.state == "CA"
assert u.zipcode == "94105"
# License details
assert u.presto_license_number == 692624515
assert u.medical_license_upload_completed_at
assert u.driving_license_upload_completed_at
assert u.license_initial_upload_completed_at
# Generated by our backend on succesful oauth login
assert b.is_text_present("You are now logged in")
b.find_by_css("#nav-logout").click()
assert b.is_text_present("You are now logged out")
def test_presto_double_login(web_server, browser, DBSession, init):
"""Login Presto user twice and see we do heavy data import only once."""
b = browser
# Initiate Presto login with Authomatic
b.visit("{}/login".format(web_server))
b.find_by_css(".btn-login-prestodoctor").click()
do_presto_login_if_needed(b)
assert b.is_text_present("You are now logged in")
# See that we got somewhat sane data
with transaction.manager:
assert DBSession.query(User).count() == 1
u = DBSession.query(User).get(1)
# Grab timestamp of full data update
full_data_updated_at = u.user_data["social"]["prestodoctor"]["full_data_updated_at"]
b.find_by_css("#nav-logout").click()
# Go again
b.visit("{}/login".format(web_server))
b.find_by_css(".btn-login-prestodoctor").click()
do_presto_login_if_needed(b)
assert b.is_text_present("You are now logged in")
with transaction.manager:
assert DBSession.query(User).count() == 1
u = DBSession.query(User).get(1)
# Grab timestamp of full data update
assert u.user_data["social"]["prestodoctor"]["full_data_updated_at"] == full_data_updated_at
def test_presto_non_evaluated_user(web_server, non_evaluated_user_browser, DBSession, init):
"""Login Presto user who has not evaluation done yet."""
b = non_evaluated_user_browser
# Initiate Presto login with Authomatic
b.visit("{}/login".format(web_server))
b.find_by_css(".btn-login-prestodoctor").click()
do_presto_login_if_needed(b, presto_user=PRESTO_USER_WITHOUT_RECOMMENDATION)
assert b.is_text_present("You are now logged in")
# See that we got somewhat sane data
with transaction.manager:
assert DBSession.query(User).count() == 1
u = DBSession.query(User).get(1)
assert u.first_login
assert u.email == PRESTO_USER_WITHOUT_RECOMMENDATION
assert u.activated_at
assert u.last_login_ip == "127.0.0.1"
# Check user basic data
assert u.full_name == 'Test Oauth2'
assert u.user_data["social"]["prestodoctor"]["dob"] == -621648001
assert u.address == "123 MARKET ST"
assert u.city == "SAN FRANCISCO"
assert u.state == "CA"
assert u.zipcode == "94105"
# License details should be empty
assert not u.presto_license_number
assert not u.medical_license_upload_completed_at
assert not u.driving_license_upload_completed_at
assert not u.license_initial_upload_completed_at
def test_presto_order(web_server, browser, DBSession, init):
"""Do direct-to-buy now with licenced presto user.
This should go directly to thank you page, no medical evaluation questions needed.
"""
b = browser
start_order(web_server, browser, init, login_needed=False)
assert b.is_text_present("Sign in to buy")
b.find_by_css(".btn-login-prestodoctor").click()
do_presto_login_if_needed(b)
# Generated by our backend on succesful oauth login
assert b.is_text_present("You are now logged in")
assert b.is_text_present("Checkout")
# Assert we are on the order page
fill_in_delivery_details(b, phone_number=PHONE_NUMBER, email=None)
confirm_delivery(b, membership=True)
assert b.is_element_visible_by_css("#thank-you")
def test_presto_order_non_evaluated_user(web_server, non_evaluated_user_browser, DBSession, init):
"""Do direct-to-buy now with licenced presto user.
This should go directly to thank you page, no medical evaluation questions needed.
"""
b = non_evaluated_user_browser
start_order(web_server, b, init, login_needed=False)
assert b.is_text_present("Sign in to buy")
b.find_by_css(".btn-login-prestodoctor").click()
do_presto_login_if_needed(b, presto_user=PRESTO_USER_WITHOUT_RECOMMENDATION)
# Generated by our backend on succesful oauth login
assert b.is_text_present("You are now logged in")
assert b.is_text_present("Checkout")
# Assert we are on the order page
fill_in_delivery_details(b, phone_number=PHONE_NUMBER, email=None)
confirm_delivery(b, membership=True)
assert b.is_element_visible_by_css("#medical-recommendation")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment