Last active
September 29, 2018 21:06
-
-
Save andybeak/9c0163053c5c0296770b8968d0ddf1aa to your computer and use it in GitHub Desktop.
LinkedIn article on Django and Azure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def get_access_token(self, request): | |
authorization_header = request.META['HTTP_AUTHORIZATION'] | |
bearer, _, token = authorization_header.partition(' ') | |
if bearer != self.AUTH_TYPE_PREFIX: | |
raise ValueError('Invalid token') | |
return token |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"token_type": "Bearer", | |
"expires_in": "3599", | |
"ext_expires_in": "0", | |
"expires_on": "1538253592", | |
"not_before": "1538249692", | |
"resource": "2f50eb0f-b4f7-4a33-83d6-2223f3a7c61a", | |
"access_token": "JWT token" | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def __call__(self, request):# Code to be executed for each request before# the view (and later middleware) are called.try: | |
access_token = self.get_access_token(request) | |
self.check_access_token(access_token) | |
response = self.get_response(request) | |
except (jwt.DecodeError, ValueError) as e: | |
response = HttpResponse('Unauthorized', status=401) | |
except Exception as e: | |
response = HttpResponse('Unauthorized', status=401) | |
# Code to be executed for each request/response after# the view is called. | |
return response |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def check_access_token(self, access_token): | |
token_header = jwt.get_unverified_header(access_token) | |
public_key = self.get_public_key(token_header) | |
jwt.decode( | |
access_token, | |
public_key, | |
algorithms=token_header['alg'], | |
audience=settings.OUATH2_AUDIENCE, | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.conf import settings | |
from django.http import HttpResponse | |
from django.core.cache import cache | |
import jwt | |
import requests | |
from cryptography.x509 import load_pem_x509_certificate | |
from cryptography.hazmat.backends import default_backend | |
class AccessTokenMiddleware: | |
AUTH_TYPE_PREFIX = 'Bearer' | |
PUBLIC_KEY_CACHEKEY = 'public_key' | |
TWELVE_HOURS = 43200 | |
def __init__(self, get_response): | |
self.get_response = get_response | |
# One-time configuration and initialization. | |
def __call__(self, request): | |
# Code to be executed for each request before | |
# the view (and later middleware) are called. | |
try: | |
access_token = self.get_access_token(request) | |
self.check_access_token(access_token) | |
response = self.get_response(request) | |
except (jwt.DecodeError, ValueError) as e: | |
response = HttpResponse('Unauthorized', status=401) | |
except Exception as e: | |
response = HttpResponse('Unauthorized [' + str(e) + ']', status=401) | |
# Code to be executed for each request/response after | |
# the view is called. | |
return response | |
def get_access_token(self, request): | |
authorization_header = request.META['HTTP_AUTHORIZATION'] | |
bearer, _, token = authorization_header.partition(' ') | |
if bearer != self.AUTH_TYPE_PREFIX: | |
raise ValueError('Invalid token') | |
return token | |
def check_access_token(self, access_token): | |
token_header = jwt.get_unverified_header(access_token) | |
public_key = self.get_public_key(token_header) | |
jwt.decode( | |
access_token, | |
public_key, | |
algorithms=token_header['alg'], | |
audience=settings.OUATH2_AUDIENCE, | |
) | |
def get_public_key(self, token_header): | |
cert = cache.get(self.PUBLIC_KEY_CACHEKEY) | |
if cert is None: | |
cert = self.fetch_azure_signature(token_header) | |
cache.set(self.PUBLIC_KEY_CACHEKEY, cert, self.TWELVE_HOURS) | |
public_key = load_pem_x509_certificate(cert.encode(), default_backend()).public_key() | |
return public_key | |
def fetch_azure_signature(self, token_header): | |
# Fetch Azure OpenID configuration document | |
res = requests.get(settings.OAUTH2_SETTINGS_URL) | |
jwk_uri = res.json()['jwks_uri'] | |
# From Azure configuration document find where the keys are | |
res = requests.get(jwk_uri) | |
jwk_keys = res.json() | |
# Iterate JWK keys and extract matching x5c chain | |
x5c = None | |
for key in jwk_keys['keys']: | |
if key['kid'] == token_header['kid']: | |
x5c = key['x5c'] | |
# It is already base64 encoded, just wrap it as a certificate | |
cert = ''.join([ | |
'-----BEGIN CERTIFICATE-----\n', | |
x5c[0], | |
'\n-----END CERTIFICATE-----\n', | |
]) | |
return cert |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
curl -X POST \ | |
https://login.microsoftonline.com/<tenant> id/oauth2/token \ | |
-F grant_type=client_credentials \ | |
-F client_id=<client application id> \ | |
-F client_secret=<client secret key> \ | |
-F resource=<server application id> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def get_public_key(self, token_header): | |
cert = cache.get(self.PUBLIC_KEY_CACHEKEY) | |
if cert is None: | |
cert = self.fetch_azure_signature(token_header) | |
cache.set(self.PUBLIC_KEY_CACHEKEY, cert, self.TWELVE_HOURS) | |
public_key = load_pem_x509_certificate(cert.encode(), default_backend()).public_key() | |
return public_key | |
def fetch_azure_signature(self, token_header):# Fetch Azure OpenID configuration document | |
res = requests.get(settings.OAUTH2_SETTINGS_URL) | |
jwk_uri = res.json()['jwks_uri'] | |
# From Azure configuration document find where the keys are | |
res = requests.get(jwk_uri) | |
jwk_keys = res.json() | |
# Iterate JWK keys and extract matching x5c chain | |
x5c = Nonefor key in jwk_keys['keys']: | |
if key['kid'] == token_header['kid']: | |
x5c = key['x5c'] | |
# It is already base64 encoded, just wrap it as a certificate | |
cert = ''.join([ | |
'-----BEGIN CERTIFICATE-----\n', | |
x5c[0], | |
'\n-----END CERTIFICATE-----\n', | |
]) | |
return cert |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"aud": "<your api application id>", | |
"iss": "https://sts.windows.net/651c54ee-ecc6-49af-87a5-b83e93bd1519/", | |
"iat": 1538249692, | |
"nbf": 1538249692, | |
"exp": 1538253592, | |
"aio": "42BgYEgSW5MV4i58UDPvAqPPJ3djAA==", | |
"appid": "cb974946-528a-4168-97c4-b4d11178f5dd", | |
"appidacr": "1", | |
"idp": "https://sts.windows.net/651c54ee-ecc6-49af-87a5-b83e93bd1519/", | |
"oid": "6f47c4af-865a-41a9-8838-3b481506c86c", | |
"sub": "6f47c4af-865a-41a9-8838-3b481506c86c", | |
"tid": "651c54ee-ecc6-49af-87a5-b83e93bd1519", | |
"uti": "SLQQHbUpLEqNYFSAsKZSAQ", | |
"ver": "1.0" | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# OAuth2 related settings | |
OUATH2_AUDIENCE = '<Your api application id>' | |
OAUTH2_SETTINGS_URL = 'https://login.microsoftonline.com/common/.well-known/openid-configuration' | |
# Caching | |
CACHES = { | |
'default': { | |
'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache', | |
'LOCATION': '127.0.0.1:11211', | |
} | |
} | |
# Register the custom middleware to verify the access token | |
MIDDLEWARE = [ | |
'django.middleware.security.SecurityMiddleware', | |
'api.middleware.checkaccesstoken.AccessTokenMiddleware', | |
'django.contrib.sessions.middleware.SessionMiddleware', | |
'django.middleware.common.CommonMiddleware', | |
'django.middleware.csrf.CsrfViewMiddleware', | |
'django.contrib.auth.middleware.AuthenticationMiddleware', | |
'django.contrib.messages.middleware.MessageMiddleware', | |
'django.middleware.clickjacking.XFrameOptionsMiddleware', | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment