Skip to content

Instantly share code, notes, and snippets.

@dcollien
Last active August 16, 2018 05:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dcollien/8d7bd74cffa36235db58bce61845524e to your computer and use it in GitHub Desktop.
Save dcollien/8d7bd74cffa36235db58bce61845524e to your computer and use it in GitHub Desktop.
OAuth2.0 Sign-On for various providers, and retrieving user details: id, name, email, photo
class Config(object):
def __init__(self, **entries):
self.__dict__.update(entries)
self._entries = entries
def __repr__(self):
return "Config(%s)" % str(self._entries)
def __getattr__(self, value):
return None
def _get_user(self, session, cache):
if cache.get('user') is None:
cache['user'] = self.get(session, self.user_api).json()
return cache.get('user')
def get_user_id(self, session, cache):
return self._get_user(session, cache)[self.user_id_field]
def get_full_name(self, session, cache):
return self._get_user(session, cache)[self.user_name_field]
def get_email(self, session, cache):
return self._get_user(session, cache)[self.user_email_field]
def get_image_url(self, session, cache):
return self._get_user(session, cache)[self.user_image_field]
def get_image_file(self, session, cache):
import requests
from StringIO import StringIO
url = self.get_image(session, cache)
if url is None:
return None
image_response = requests.get(url)
if str(image_response.status_code).startswith('2'):
return StringIO(image_response.content)
else:
return None
def get(self, session, path):
url = '/'.join([self.api_base_url.rstrip('/'), path.lstrip('/')])
return session.get(url)
import flask
import os
from oauth_flow import OAuth2Flow
from providers import Github, Microsoft, Facebook, LinkedIn, Google
app = flask.Flask(__name__)
app.debug = True
app.secret_key = 'development'
base_uri = "https://mysite.ngrok.io"
# Use the urls in comments to register apps for these providers
configurations = {
# https://github.com/settings/applications/new
"github": Github(
"key",
"secret"
),
# https://apps.dev.microsoft.com/#/appList
"microsoft": Microsoft(
"key",
"secret",
base_uri + '/microsoft/login'
),
# https://developers.facebook.com/apps
"facebook": Facebook(
"key",
"secret",
base_uri + '/facebook/login'
),
# https://www.linkedin.com/secure/developer
"linkedin": LinkedIn(
"key",
"secret",
base_uri + "/linkedin/login"
),
# https://console.cloud.google.com/apis/credentials
"google": Google(
"key",
"secret",
base_uri + '/google/login'
)
}
@app.route('/<config>')
def start(config):
flow = OAuth2Flow(configurations[config])
url, state = flow.authorization_url()
flask.session['state'] = state
return flask.redirect(url)
@app.route('/<config>/login')
def login(config):
# This is a hack for local dev, don't do this in production
url = flask.request.url.replace('http://', 'https://')
flow = OAuth2Flow(configurations[config])
session = flow.get_authorized_session(flask.session['state'], url)
#return flask.jsonify(session.user)
return """
<html>
<body>
<p>
Hello {name}<br/>
<img src="{image}"/>
<br/>
Email: {email}<br/>
ID: {id}<br/>
</p>
</body>
</html>""".format(
name=session.full_name,
image=session.image_url,
email=session.email,
id=session.user_id
)
if __name__ == '__main__':
port = int(os.environ.get('PORT', 5000))
app.run(host='0.0.0.0', port=port)
from requests_oauthlib import OAuth2Session
from config import Config
class AuthorizedSession(object):
def __init__(self, oauth2_session, config):
self.config = config
self.session = oauth2_session
self.cache = {}
@property
def image_url(self):
return self.config.get_image_url(self.session, self.cache)
@property
def image_file(self):
return self.config.get_image_file(self.session, self.cache)
@property
def user_id(self):
return self.config.get_user_id(self.session, self.cache)
@property
def email(self):
return self.config.get_email(self.session, self.cache)
@property
def full_name(self):
return self.config.get_full_name(self.session, self.cache)
def get(self, path):
return self.config.get(self.session, path)
@property
def user(self):
return self.config._get_user(self.session, self.cache)
class OAuth2Flow(object):
def __init__(self, config):
if type(config) == dict:
config = Config(config)
self.config = config
def authorization_url(self):
session = OAuth2Session(
self.config.client_id,
scope=self.config.scope,
redirect_uri=self.config.redirect_uri
)
if self.config.session_fix is not None:
session = self.config.session_fix(session)
authorization_url, state = session.authorization_url(self.config.authorization_base_url)
return authorization_url, state
def get_authorized_session(self, state, authorization_response):
session = OAuth2Session(
self.config.client_id,
state=state,
redirect_uri=self.config.redirect_uri
)
if self.config.session_fix is not None:
session = self.config.session_fix(session)
token = session.fetch_token(
self.config.token_url,
client_secret=self.config.client_secret,
authorization_response=authorization_response
)
return AuthorizedSession(session, self.config)
from config import Config
class Github(Config):
def __init__(self, client_id, client_secret):
super(Github, self).__init__(**{
"client_id": client_id,
"client_secret": client_secret,
"authorization_base_url": "https://github.com/login/oauth/authorize",
"token_url": "https://github.com/login/oauth/access_token",
"api_base_url": "https://api.github.com/",
"user_api": "user",
"user_id_field": "id",
"user_email_field": "email",
"user_name_field": "name",
"user_image_field": "avatar"
})
class Google(Config):
def __init__(self, client_id, client_secret, redirect_uri):
super(Google, self).__init__(**{
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
"authorization_base_url": "https://accounts.google.com/o/oauth2/v2/auth",
"token_url": "https://www.googleapis.com/oauth2/v4/token",
"scope": [
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/userinfo.profile"
],
"api_base_url": "https://www.googleapis.com/oauth2/v1/",
"user_api": "userinfo",
"user_id_field": "id",
"user_email_field": "email",
"user_name_field": "name",
"user_image_field": "picture"
})
class Microsoft(Config):
def __init__(self, client_id, client_secret, redirect_uri):
super(Microsoft, self).__init__(**{
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
"authorization_base_url": "https://login.microsoftonline.com/common/oauth2/v2.0/authorize",
"token_url": "https://login.microsoftonline.com/common/oauth2/v2.0/token",
"scope": ["User.Read"],
"api_base_url": "https://graph.microsoft.com/beta",
"user_api": "me",
"user_id_field": "id",
"user_email_field": "userPrincipalName",
"user_name_field": "displayName"
})
def get_image_url(self, session, cache):
from base64 import b64encode
image_response = self.get(session, 'me/photo/$value')
if str(image_response.status_code).startswith('2'):
metadata_response = self.get(session, 'me/photo')
content_type = metadata_response.json().get('@odata.mediaContentType', '')
encoded_bytes = b64encode(image_response.content)
return "data:{};base64,{}".format(content_type, encoded_bytes.decode('utf-8'))
else:
return None
def get_image_file(self, session, cache):
from StringIO import StringIO
image_response = self.get(session, 'me/photo/$value')
if str(image_response.status_code).startswith('2'):
return StringIO(image_response.content)
else:
return None
class LinkedIn(Config):
def __init__(self, client_id, client_secret, redirect_uri):
from requests_oauthlib.compliance_fixes import linkedin_compliance_fix
super(LinkedIn, self).__init__(**{
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
"authorization_base_url": "https://www.linkedin.com/uas/oauth2/authorization",
"token_url": "https://www.linkedin.com/uas/oauth2/accessToken",
"api_base_url": "https://api.linkedin.com/v1",
"user_api": "people/~:(id,email-address,firstName,lastName,picture-url)?format=json",
"user_id_field": "id",
"user_email_field": "emailAddress",
"user_image_field": "pictureUrl",
"session_fix": linkedin_compliance_fix
})
def get_full_name(self, session, cache):
user = self._get_user(session, cache)
return ' '.join([user['firstName'], user['lastName']])
class Facebook(Config):
def __init__(self, client_id, client_secret, redirect_uri):
from requests_oauthlib.compliance_fixes import facebook_compliance_fix
super(Facebook, self).__init__(**{
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
"authorization_base_url": "https://www.facebook.com/dialog/oauth",
"token_url": "https://graph.facebook.com/oauth/access_token",
"api_base_url": "https://graph.facebook.com/v3.1",
"user_api": "me?fields=id,email,profile_pic,name",
"user_id_field": "id",
"user_email_field": "email",
"user_image_field": "profile_pic",
"user_name_field": "name",
"session_fix": facebook_compliance_fix
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment