Last active
December 1, 2020 11:39
-
-
Save kakky/6809432 to your computer and use it in GitHub Desktop.
Example of Yahoo! JAPAN OpenID Connect (YConnect) backend
using python-social-auth
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
yohoojp.py - Yahoo! JAPAN OpenID Connect (YConnect) backends of python-social-auth. | |
https://github.com/omab/python-social-auth | |
http://developer.yahoo.co.jp/yconnect/ | |
settings.py should include the following: | |
SOCIAL_AUTH_YAHOOJP_OIDC_KEY = '...' | |
SOCIAL_AUTH_YAHOOJP_OIDC_SECRET = '...' | |
SOCIAL_AUTH_YAHOOJP_OIDC_AUTH_EXTRA_ARGUMENTS = {'display': 'touch', 'prompt': 'login'} | |
SOCIAL_AUTH_YAHOOJP_OIDC_SCOPE = ['profile', 'email', 'address'] | |
AUTHENTICATION_BACKENDS = ( | |
'yourapp.backends.yahoojp.YahoojpOidc', | |
'django.contrib.auth.backends.ModelBackend', | |
) | |
""" | |
from social.backends.oauth import BaseOAuth2 | |
import base64, json, hmac, hashlib | |
from calendar import timegm | |
from datetime import datetime | |
from social.exceptions import AuthCanceled, AuthTokenError, AuthFailed | |
from requests import request | |
class YahoojpOidc(BaseOAuth2): | |
name = 'yahoojp-oidc' | |
ID_KEY = 'user_id' | |
AUTHORIZATION_URL = 'https://auth.login.yahoo.co.jp/yconnect/v1/authorization' | |
ACCESS_TOKEN_URL = 'https://auth.login.yahoo.co.jp/yconnect/v1/token' | |
USERINFO_URL = 'https://userinfo.yahooapis.jp/yconnect/v1/attribute' | |
ID_TOKEN_ISS = 'https://auth.login.yahoo.co.jp' | |
RESPONSE_TYPE = 'code id_token' | |
DEFAULT_SCOPE = ['openid'] | |
REDIRECT_STATE = False | |
ACCESS_TOKEN_METHOD = 'POST' | |
EXTRA_DATA = [ | |
('access_token', 'access_token'), | |
('token_type', 'token_type'), | |
('expires_in', 'expires_in'), | |
('refresh_token', 'refresh_token'), | |
('id_token', 'id_token'), | |
('id_token_payload', 'id_token_payload') | |
] | |
def auth_params(self, state): | |
'''Return extra arguments needed on auth process''' | |
params = super(YahoojpOidc, self).auth_params(state) | |
nonce = self.strategy.random_string(64) | |
params['nonce'] = nonce | |
self._store_nonce(nonce, state) | |
return params | |
def auth_complete_params(self, state=None): | |
return { | |
'grant_type': 'authorization_code', # request auth code | |
'code': self.data.get('code', ''), # server response code | |
'redirect_uri': self.get_redirect_uri(state) | |
} | |
def auth_headers(self): | |
client_id, client_secret = self.get_key_and_secret() | |
basic_auth = base64.urlsafe_b64encode((client_id + ':' + client_secret).encode()).decode().replace('=', '') | |
return { | |
'Content-Type': 'application/x-www-form-urlencoded', | |
'Authorization': 'Basic ' + basic_auth | |
} | |
def user_data(self, access_token, *args, **kwargs): | |
"""Loads user data from service. Implement in subclass""" | |
user_info = {} | |
# id_token decode and check | |
id_token = kwargs['response'].get('id_token') | |
if id_token: | |
client_id, client_secret = self.get_key_and_secret() | |
payload = self._jwt_decode(id_token, client_secret) | |
if payload['iss'] != self.ID_TOKEN_ISS: | |
raise AuthTokenError(self, 'Incorrect id_token : iss') | |
if payload['aud'] != client_id: | |
raise AuthTokenError(self, 'Incorrect id_token : aud') | |
utc_timestamp = timegm(datetime.utcnow().utctimetuple()) | |
if payload['exp'] < utc_timestamp: | |
raise AuthTokenError(self, 'Incorrect id_token : exp') | |
if payload['iat'] < (utc_timestamp - 600): | |
raise AuthTokenError(self, 'Incorrect id_token : iat') | |
nonce_obj = self._get_nonce(payload['nonce']) | |
if nonce_obj: | |
self._remove_nonce(nonce_obj.id) | |
else: | |
raise AuthTokenError(self, 'Incorrect id_token : nonce') | |
user_info['id_token_payload'] = payload | |
# UserInfo Request | |
json = self.get_json( | |
self.USERINFO_URL, | |
params={'schema': 'openid'}, | |
headers = {'Authorization': 'Bearer ' + access_token} | |
) | |
user_info.update(json) | |
return user_info | |
def get_user_details(self, response): | |
return {'username': response.get('user_id', ''), | |
'email':response.get('email', ''), | |
'fullname': response.get('name', ''), | |
'first_name': response.get('given_name', ''), | |
'last_name': response.get('family_name', '')} | |
def refresh_token_params(self, token, *args, **kwargs): | |
return { | |
'refresh_token': token, | |
'grant_type': 'refresh_token', | |
} | |
def refresh_token(self, token, *args, **kwargs): | |
response = super(YahoojpOidc, self).refresh_token(token) | |
error = response.get('error_description') or response.get('error') | |
if error: | |
if response.get('error') == 'invalid_grant': # invalid_grant (401) - Refresh token has expired. | |
raise AuthCanceled(self, error) | |
else: | |
raise AuthFailed(self, error) | |
return response | |
def request(self, url, method='GET', *args, **kwargs): | |
''' If status_code is 4xx, modify to pick up the value of error. ''' | |
kwargs.setdefault('timeout', self.setting('REQUESTS_TIMEOUT') or | |
self.setting('URLOPEN_TIMEOUT')) | |
response = request(method, url, *args, **kwargs) | |
if 400 <= response.status_code < 500: # If 'Client error', Not to the Exception. | |
pass | |
else: | |
response.raise_for_status() | |
return response | |
def _store_nonce(self, nonce, state): | |
server_url = self.AUTHORIZATION_URL | |
association = YahoojpOidcAssociation(nonce, assoc_type=state) | |
self.strategy.storage.association.store(server_url, association) | |
def _get_nonce(self, nonce): | |
server_url = self.AUTHORIZATION_URL | |
try: | |
return self.strategy.storage.association.get(server_url=server_url, handle=nonce)[0] | |
except: | |
return None | |
def _remove_nonce(self, id): | |
try: | |
self.strategy.storage.association.remove([id]) | |
except: | |
return None | |
def _jwt_decode(self, jwt, key=''): | |
try: | |
signing_input, crypto_segment = str(jwt).rsplit('.', 1) | |
header_segment, payload_segment = signing_input.split('.', 1) | |
except ValueError: | |
raise AuthTokenError(self, 'Incorrect id_token : Not enough segments') | |
try: | |
header = json.loads(self._base64url_decode(header_segment)) | |
except TypeError: | |
raise AuthTokenError(self, 'Incorrect id_token : Invalid header padding') | |
except ValueError as e: | |
raise AuthTokenError(self, 'Incorrect id_token : Invalid header string: %s' % e) | |
try: | |
payload = json.loads(self._base64url_decode(payload_segment)) | |
except TypeError: | |
raise AuthTokenError(self, 'Incorrect id_token : Invalid payload padding') | |
except ValueError as e: | |
raise AuthTokenError(self, 'Incorrect id_token : Invalid payload string: %s' % e) | |
try: | |
signature = self._base64url_decode(crypto_segment) | |
except TypeError: | |
raise AuthTokenError(self, 'Incorrect id_token : Invalid crypto padding') | |
try: | |
sign = self._jwt_sign[header['alg']](signing_input, key) | |
except KeyError: | |
raise AuthTokenError(self, 'Incorrect id_token : Algorithm not supported') | |
if signature != sign: | |
raise AuthTokenError(self, 'Incorrect id_token : Signature verification failed') | |
return payload | |
_jwt_sign = { | |
'HS256': lambda msg, key: hmac.new(key.encode(), msg.encode(), hashlib.sha256).digest(), | |
'HS384': lambda msg, key: hmac.new(key.encode(), msg.encode(), hashlib.sha384).digest(), | |
'HS512': lambda msg, key: hmac.new(key.encode(), msg.encode(), hashlib.sha512).digest(), | |
} | |
def _base64url_decode(self, input): | |
rem = len(input) % 4 | |
if rem > 0: | |
input += '=' * (4 - rem) | |
try: | |
return base64.urlsafe_b64decode(input.encode()).decode() # return str | |
except UnicodeDecodeError: | |
return base64.urlsafe_b64decode(input.encode()) # return byte | |
class YahoojpOidcAssociation(object): | |
''' Use Association model to save the nonce by force. ''' | |
def __init__(self, handle, secret='', issued=0, lifetime=0, assoc_type=''): | |
self.handle = handle # as nonce | |
self.secret = secret.encode() # not use | |
self.issued = issued # not use | |
self.lifetime = lifetime # not use | |
self.assoc_type = assoc_type # as state |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment