Skip to content

Instantly share code, notes, and snippets.

@kakky
Last active December 1, 2020 11:39
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kakky/6809432 to your computer and use it in GitHub Desktop.
Save kakky/6809432 to your computer and use it in GitHub Desktop.
Example of Yahoo! JAPAN OpenID Connect (YConnect) backend using python-social-auth
# -*- coding: utf-8 -*-
"""
yohoojp.py - Yahoo! JAPAN OpenID Connect (YConnect) backends of python-social-auth.
https://github.com/omab/python-social-auth
http://developer.yahoo.co.jp/yconnect/
settings.py should include the following:
SOCIAL_AUTH_YAHOOJP_OIDC_KEY = '...'
SOCIAL_AUTH_YAHOOJP_OIDC_SECRET = '...'
SOCIAL_AUTH_YAHOOJP_OIDC_AUTH_EXTRA_ARGUMENTS = {'display': 'touch', 'prompt': 'login'}
SOCIAL_AUTH_YAHOOJP_OIDC_SCOPE = ['profile', 'email', 'address']
AUTHENTICATION_BACKENDS = (
'yourapp.backends.yahoojp.YahoojpOidc',
'django.contrib.auth.backends.ModelBackend',
)
"""
from social.backends.oauth import BaseOAuth2
import base64, json, hmac, hashlib
from calendar import timegm
from datetime import datetime
from social.exceptions import AuthCanceled, AuthTokenError, AuthFailed
from requests import request
class YahoojpOidc(BaseOAuth2):
name = 'yahoojp-oidc'
ID_KEY = 'user_id'
AUTHORIZATION_URL = 'https://auth.login.yahoo.co.jp/yconnect/v1/authorization'
ACCESS_TOKEN_URL = 'https://auth.login.yahoo.co.jp/yconnect/v1/token'
USERINFO_URL = 'https://userinfo.yahooapis.jp/yconnect/v1/attribute'
ID_TOKEN_ISS = 'https://auth.login.yahoo.co.jp'
RESPONSE_TYPE = 'code id_token'
DEFAULT_SCOPE = ['openid']
REDIRECT_STATE = False
ACCESS_TOKEN_METHOD = 'POST'
EXTRA_DATA = [
('access_token', 'access_token'),
('token_type', 'token_type'),
('expires_in', 'expires_in'),
('refresh_token', 'refresh_token'),
('id_token', 'id_token'),
('id_token_payload', 'id_token_payload')
]
def auth_params(self, state):
'''Return extra arguments needed on auth process'''
params = super(YahoojpOidc, self).auth_params(state)
nonce = self.strategy.random_string(64)
params['nonce'] = nonce
self._store_nonce(nonce, state)
return params
def auth_complete_params(self, state=None):
return {
'grant_type': 'authorization_code', # request auth code
'code': self.data.get('code', ''), # server response code
'redirect_uri': self.get_redirect_uri(state)
}
def auth_headers(self):
client_id, client_secret = self.get_key_and_secret()
basic_auth = base64.urlsafe_b64encode((client_id + ':' + client_secret).encode()).decode().replace('=', '')
return {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': 'Basic ' + basic_auth
}
def user_data(self, access_token, *args, **kwargs):
"""Loads user data from service. Implement in subclass"""
user_info = {}
# id_token decode and check
id_token = kwargs['response'].get('id_token')
if id_token:
client_id, client_secret = self.get_key_and_secret()
payload = self._jwt_decode(id_token, client_secret)
if payload['iss'] != self.ID_TOKEN_ISS:
raise AuthTokenError(self, 'Incorrect id_token : iss')
if payload['aud'] != client_id:
raise AuthTokenError(self, 'Incorrect id_token : aud')
utc_timestamp = timegm(datetime.utcnow().utctimetuple())
if payload['exp'] < utc_timestamp:
raise AuthTokenError(self, 'Incorrect id_token : exp')
if payload['iat'] < (utc_timestamp - 600):
raise AuthTokenError(self, 'Incorrect id_token : iat')
nonce_obj = self._get_nonce(payload['nonce'])
if nonce_obj:
self._remove_nonce(nonce_obj.id)
else:
raise AuthTokenError(self, 'Incorrect id_token : nonce')
user_info['id_token_payload'] = payload
# UserInfo Request
json = self.get_json(
self.USERINFO_URL,
params={'schema': 'openid'},
headers = {'Authorization': 'Bearer ' + access_token}
)
user_info.update(json)
return user_info
def get_user_details(self, response):
return {'username': response.get('user_id', ''),
'email':response.get('email', ''),
'fullname': response.get('name', ''),
'first_name': response.get('given_name', ''),
'last_name': response.get('family_name', '')}
def refresh_token_params(self, token, *args, **kwargs):
return {
'refresh_token': token,
'grant_type': 'refresh_token',
}
def refresh_token(self, token, *args, **kwargs):
response = super(YahoojpOidc, self).refresh_token(token)
error = response.get('error_description') or response.get('error')
if error:
if response.get('error') == 'invalid_grant': # invalid_grant (401) - Refresh token has expired.
raise AuthCanceled(self, error)
else:
raise AuthFailed(self, error)
return response
def request(self, url, method='GET', *args, **kwargs):
''' If status_code is 4xx, modify to pick up the value of error. '''
kwargs.setdefault('timeout', self.setting('REQUESTS_TIMEOUT') or
self.setting('URLOPEN_TIMEOUT'))
response = request(method, url, *args, **kwargs)
if 400 <= response.status_code < 500: # If 'Client error', Not to the Exception.
pass
else:
response.raise_for_status()
return response
def _store_nonce(self, nonce, state):
server_url = self.AUTHORIZATION_URL
association = YahoojpOidcAssociation(nonce, assoc_type=state)
self.strategy.storage.association.store(server_url, association)
def _get_nonce(self, nonce):
server_url = self.AUTHORIZATION_URL
try:
return self.strategy.storage.association.get(server_url=server_url, handle=nonce)[0]
except:
return None
def _remove_nonce(self, id):
try:
self.strategy.storage.association.remove([id])
except:
return None
def _jwt_decode(self, jwt, key=''):
try:
signing_input, crypto_segment = str(jwt).rsplit('.', 1)
header_segment, payload_segment = signing_input.split('.', 1)
except ValueError:
raise AuthTokenError(self, 'Incorrect id_token : Not enough segments')
try:
header = json.loads(self._base64url_decode(header_segment))
except TypeError:
raise AuthTokenError(self, 'Incorrect id_token : Invalid header padding')
except ValueError as e:
raise AuthTokenError(self, 'Incorrect id_token : Invalid header string: %s' % e)
try:
payload = json.loads(self._base64url_decode(payload_segment))
except TypeError:
raise AuthTokenError(self, 'Incorrect id_token : Invalid payload padding')
except ValueError as e:
raise AuthTokenError(self, 'Incorrect id_token : Invalid payload string: %s' % e)
try:
signature = self._base64url_decode(crypto_segment)
except TypeError:
raise AuthTokenError(self, 'Incorrect id_token : Invalid crypto padding')
try:
sign = self._jwt_sign[header['alg']](signing_input, key)
except KeyError:
raise AuthTokenError(self, 'Incorrect id_token : Algorithm not supported')
if signature != sign:
raise AuthTokenError(self, 'Incorrect id_token : Signature verification failed')
return payload
_jwt_sign = {
'HS256': lambda msg, key: hmac.new(key.encode(), msg.encode(), hashlib.sha256).digest(),
'HS384': lambda msg, key: hmac.new(key.encode(), msg.encode(), hashlib.sha384).digest(),
'HS512': lambda msg, key: hmac.new(key.encode(), msg.encode(), hashlib.sha512).digest(),
}
def _base64url_decode(self, input):
rem = len(input) % 4
if rem > 0:
input += '=' * (4 - rem)
try:
return base64.urlsafe_b64decode(input.encode()).decode() # return str
except UnicodeDecodeError:
return base64.urlsafe_b64decode(input.encode()) # return byte
class YahoojpOidcAssociation(object):
''' Use Association model to save the nonce by force. '''
def __init__(self, handle, secret='', issued=0, lifetime=0, assoc_type=''):
self.handle = handle # as nonce
self.secret = secret.encode() # not use
self.issued = issued # not use
self.lifetime = lifetime # not use
self.assoc_type = assoc_type # as state
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment