Skip to content

Instantly share code, notes, and snippets.

Last active May 2, 2022 16:26
Show Gist options
  • Save clintonb/6ee13e39ca6cc5c56c49 to your computer and use it in GitHub Desktop.
Save clintonb/6ee13e39ca6cc5c56c49 to your computer and use it in GitHub Desktop.
python-social-auth OpenID Connect Backend
This file contains Django authentication backends. For more information visit
from calendar import timegm
from datetime import datetime
from django.conf import settings
import jwt
from social.backends.oauth import BaseOAuth2
# pylint: disable=abstract-method
from social.exceptions import AuthTokenError
class EdXOAuth2(BaseOAuth2):
name = 'edx-oauth2'
AUTHORIZATION_URL = '{0}/authorize/'.format(settings.SOCIAL_AUTH_EDX_OAUTH2_URL_ROOT)
ACCESS_TOKEN_URL = '{0}/access_token/'.format(settings.SOCIAL_AUTH_EDX_OAUTH2_URL_ROOT)
ID_KEY = 'username'
('username', 'id'),
('code', 'code'),
('expires_in', 'expires'),
('refresh_token', 'refresh_token', True),
def get_user_details(self, response):
"""Return user details from edX account"""
return {
'username': response.get('username'),
'email': '',
'fullname': '',
'first_name': '',
'last_name': ''
class OpenIdConnectAssociation(object):
""" Use Association model to save the nonce by force. """
def __init__(self, handle, secret='', issued=0, lifetime=0, assoc_type=''):
self.handle = handle # as nonce
self.secret = secret.encode() # not use
self.issued = issued # not use
self.lifetime = lifetime # not use
self.assoc_type = assoc_type # as state
class OpenIdConnectAuth(BaseOAuth2):
DEFAULT_SCOPE = ['openid']
('access_token', 'access_token'),
('token_type', 'token_type'),
('expires_in', 'expires_in'),
('refresh_token', 'refresh_token'),
('id_token', 'id_token')
def auth_params(self, state=None):
"""Return extra arguments needed on auth process."""
params = super(OpenIdConnectAuth, self).auth_params(state)
params['nonce'] = self._get_and_store_nonce(self.AUTHORIZATION_URL, state)
return params
def auth_complete_params(self, state=None):
params = super(OpenIdConnectAuth, self).auth_complete_params(state)
# Add a nonce to the request so that to help counter CSRF
params['nonce'] = self._get_and_store_nonce(self.ACCESS_TOKEN_URL, state)
return params
def _get_and_store_nonce(self, url, state):
# Create a nonce
nonce = self.strategy.random_string(64)
# Store the nonce
association = OpenIdConnectAssociation(nonce, assoc_type=state), association)
return nonce
def _get_nonce(self, nonce):
server_url = self.ACCESS_TOKEN_URL
return, handle=nonce)[0]
except: # pylint: disable=bare-except
return None
def _remove_nonce(self, nonce_id):
except: # pylint: disable=bare-except
return None
def _validate_and_return_id_token(self, id_token):
client_id, _client_secret = self.get_key_and_secret()
# Decode the JWT and raise an error if the secret is invalid or
# the response has expired.
decryption_key = self.setting('ID_TOKEN_DECRYPTION_KEY')
id_token = jwt.decode(id_token, decryption_key)
except (jwt.DecodeError, jwt.ExpiredSignature) as de:
raise AuthTokenError(self, de)
# Verify the issuer of the id_token is correct
if id_token['iss'] != self.ID_TOKEN_ISSUER:
raise AuthTokenError(self, 'Incorrect id_token: iss')
# Verify the token was issued in the last 10 minutes
utc_timestamp = timegm(datetime.utcnow().utctimetuple())
if id_token['iat'] < (utc_timestamp - 600):
raise AuthTokenError(self, 'Incorrect id_token: iat')
# Verify this client is the correct recipient of the id_token
aud = id_token.get('aud')
if aud != client_id:
raise AuthTokenError(self, 'Incorrect id_token: aud')
# Validate the nonce to ensure the request was not modified
nonce = id_token.get('nonce')
if not nonce:
raise AuthTokenError(self, 'Incorrect id_token: nonce')
nonce_obj = self._get_nonce(id_token['nonce'])
if nonce_obj:
raise AuthTokenError(self, 'Incorrect id_token: nonce')
return id_token
class EdXOpenIdConnect(OpenIdConnectAuth):
name = 'edx-oidc'
AUTHORIZATION_URL = '{0}/authorize/'.format(settings.SOCIAL_AUTH_EDX_OIDC_URL_ROOT)
ACCESS_TOKEN_URL = '{0}/access_token/'.format(settings.SOCIAL_AUTH_EDX_OIDC_URL_ROOT)
def user_data(self, access_token, *args, **kwargs):
return self._validate_and_return_id_token(kwargs['response'].get('id_token'))
def get_user_details(self, response):
return {
u'username': response['username'],
u'email': response['email'],
u'full_name': response['name'],
u'first_name': response['given_name'],
u'last_name': response['family_name']
from calendar import timegm
import json
import datetime
from django.conf import settings
import jwt
from social.exceptions import AuthTokenError
from social.tests.backends.oauth import OAuth2Test
class EdXOAuth2Tests(OAuth2Test):
backend_path = 'analytics_dashboard.backends.EdXOAuth2'
expected_username = 'edx'
access_token_body = json.dumps({
'access_token': 'foobar',
'token_type': 'bearer',
'username': 'edx'
def test_login(self):
def test_partial_pipeline(self):
class OpenIdConnectTestMixin(object):
Mixin to test OpenID Connect consumers. Inheriting classes should also inherit OAuth2Test.
expected_username = u'edx'
client_key = 'a-key'
client_secret = 'a-secret-key'
issuer = None # id_token issuer
def setUp(self):
super(OpenIdConnectTestMixin, self).setUp()
self.access_token_body = self._parse_nonce_and_return_access_token_body
def extra_settings(self):
xs = super(OpenIdConnectTestMixin, self).extra_settings()
'SOCIAL_AUTH_{}_KEY'.format( self.client_key,
'SOCIAL_AUTH_{}_SECRET'.format( self.client_secret,
'SOCIAL_AUTH_{}_ID_TOKEN_DECRYPTION_KEY'.format( self.client_secret
return xs
def _parse_nonce_and_return_access_token_body(self, request, _url, headers):
Get the nonce from the request parameters, add it to the id_token, and return the complete response.
body = self._prepare_access_token_body(nonce=request.parsed_body[u'nonce'][0])
return 200, headers, body
def _prepare_access_token_body(self, client_key=None, client_secret=None, expiration_datetime=None,
issue_datetime=None, nonce=None):
Prepares a provider access token response
client_id (str) -- OAuth ID for the client that requested authentication.
client_secret (str) -- OAuth secret for the client that requested authentication.
expiration_time (datetime) -- Date and time after which the response should be considered invalid.
body = {'access_token': 'foobar', 'token_type': 'bearer'}
client_key = client_key or self.client_key
client_secret = client_secret or self.client_secret
now = datetime.datetime.utcnow()
expiration_datetime = expiration_datetime or (now + datetime.timedelta(seconds=30))
issue_datetime = issue_datetime or now
nonce = nonce or None
id_token = {
u'iss': self.issuer,
u'nonce': nonce,
u'aud': client_key,
u'azp': client_key,
u'exp': timegm(expiration_datetime.utctimetuple()),
u'iat': timegm(issue_datetime.utctimetuple()),
u'username': self.expected_username,
u'name': u'Ed Xavier',
u'given_name': u'Ed',
u'family_name': u'Xavier',
u'email': u''
body[u'id_token'] = jwt.encode(id_token, client_secret)
return json.dumps(body)
def test_login(self):
def test_partial_pipeline(self):
def assertAutTokenErrorRaised(self, expected_message, **access_token_kwargs):
self.access_token_body = self._prepare_access_token_body(**access_token_kwargs)
self.assertRaisesRegexp(AuthTokenError, expected_message, self.do_login)
def test_invalid_secret(self):
self.assertAutTokenErrorRaised('Token error: Signature verification failed', client_secret='wrong!')
def test_expired_signature(self):
expiration_datetime = datetime.datetime.utcnow() - datetime.timedelta(seconds=30)
self.assertAutTokenErrorRaised('Token error: Signature has expired', expiration_datetime=expiration_datetime)
def test_invalid_audience(self):
self.assertAutTokenErrorRaised('Token error: Incorrect id_token: aud', client_key='someone-else')
def test_invalid_issue_time(self):
expiration_datetime = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
self.assertAutTokenErrorRaised('Token error: Incorrect id_token: iat', issue_datetime=expiration_datetime)
def test_invalid_nonce(self):
self.assertAutTokenErrorRaised('Token error: Incorrect id_token: nonce', nonce='something-wrong')
class EdXOpenIdConnectTests(OpenIdConnectTestMixin, OAuth2Test):
backend_path = 'analytics_dashboard.backends.EdXOpenIdConnect'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment