LinkedIn article on Django and Azure
def get_access_token(self, request): | |
authorization_header = request.META['HTTP_AUTHORIZATION'] | |
bearer, _, token = authorization_header.partition(' ') | |
if bearer != self.AUTH_TYPE_PREFIX: | |
raise ValueError('Invalid token') | |
return token |
{ | |
"token_type": "Bearer", | |
"expires_in": "3599", | |
"ext_expires_in": "0", | |
"expires_on": "1538253592", | |
"not_before": "1538249692", | |
"resource": "2f50eb0f-b4f7-4a33-83d6-2223f3a7c61a", | |
"access_token": "JWT token" | |
} |
def __call__(self, request):# Code to be executed for each request before# the view (and later middleware) are called.try: | |
access_token = self.get_access_token(request) | |
self.check_access_token(access_token) | |
response = self.get_response(request) | |
except (jwt.DecodeError, ValueError) as e: | |
response = HttpResponse('Unauthorized', status=401) | |
except Exception as e: | |
response = HttpResponse('Unauthorized', status=401) | |
# Code to be executed for each request/response after# the view is called. | |
return response |
def check_access_token(self, access_token): | |
token_header = jwt.get_unverified_header(access_token) | |
public_key = self.get_public_key(token_header) | |
jwt.decode( | |
access_token, | |
public_key, | |
algorithms=token_header['alg'], | |
audience=settings.OUATH2_AUDIENCE, | |
) |
from django.conf import settings | |
from django.http import HttpResponse | |
from django.core.cache import cache | |
import jwt | |
import requests | |
from cryptography.x509 import load_pem_x509_certificate | |
from cryptography.hazmat.backends import default_backend | |
class AccessTokenMiddleware: | |
AUTH_TYPE_PREFIX = 'Bearer' | |
PUBLIC_KEY_CACHEKEY = 'public_key' | |
TWELVE_HOURS = 43200 | |
def __init__(self, get_response): | |
self.get_response = get_response | |
# One-time configuration and initialization. | |
def __call__(self, request): | |
# Code to be executed for each request before | |
# the view (and later middleware) are called. | |
try: | |
access_token = self.get_access_token(request) | |
self.check_access_token(access_token) | |
response = self.get_response(request) | |
except (jwt.DecodeError, ValueError) as e: | |
response = HttpResponse('Unauthorized', status=401) | |
except Exception as e: | |
response = HttpResponse('Unauthorized [' + str(e) + ']', status=401) | |
# Code to be executed for each request/response after | |
# the view is called. | |
return response | |
def get_access_token(self, request): | |
authorization_header = request.META['HTTP_AUTHORIZATION'] | |
bearer, _, token = authorization_header.partition(' ') | |
if bearer != self.AUTH_TYPE_PREFIX: | |
raise ValueError('Invalid token') | |
return token | |
def check_access_token(self, access_token): | |
token_header = jwt.get_unverified_header(access_token) | |
public_key = self.get_public_key(token_header) | |
jwt.decode( | |
access_token, | |
public_key, | |
algorithms=token_header['alg'], | |
audience=settings.OUATH2_AUDIENCE, | |
) | |
def get_public_key(self, token_header): | |
cert = cache.get(self.PUBLIC_KEY_CACHEKEY) | |
if cert is None: | |
cert = self.fetch_azure_signature(token_header) | |
cache.set(self.PUBLIC_KEY_CACHEKEY, cert, self.TWELVE_HOURS) | |
public_key = load_pem_x509_certificate(cert.encode(), default_backend()).public_key() | |
return public_key | |
def fetch_azure_signature(self, token_header): | |
# Fetch Azure OpenID configuration document | |
res = requests.get(settings.OAUTH2_SETTINGS_URL) | |
jwk_uri = res.json()['jwks_uri'] | |
# From Azure configuration document find where the keys are | |
res = requests.get(jwk_uri) | |
jwk_keys = res.json() | |
# Iterate JWK keys and extract matching x5c chain | |
x5c = None | |
for key in jwk_keys['keys']: | |
if key['kid'] == token_header['kid']: | |
x5c = key['x5c'] | |
# It is already base64 encoded, just wrap it as a certificate | |
cert = ''.join([ | |
'-----BEGIN CERTIFICATE-----\n', | |
x5c[0], | |
'\n-----END CERTIFICATE-----\n', | |
]) | |
return cert |
curl -X POST \ | |
https://login.microsoftonline.com/<tenant> id/oauth2/token \ | |
-F grant_type=client_credentials \ | |
-F client_id=<client application id> \ | |
-F client_secret=<client secret key> \ | |
-F resource=<server application id> |
def get_public_key(self, token_header): | |
cert = cache.get(self.PUBLIC_KEY_CACHEKEY) | |
if cert is None: | |
cert = self.fetch_azure_signature(token_header) | |
cache.set(self.PUBLIC_KEY_CACHEKEY, cert, self.TWELVE_HOURS) | |
public_key = load_pem_x509_certificate(cert.encode(), default_backend()).public_key() | |
return public_key | |
def fetch_azure_signature(self, token_header):# Fetch Azure OpenID configuration document | |
res = requests.get(settings.OAUTH2_SETTINGS_URL) | |
jwk_uri = res.json()['jwks_uri'] | |
# From Azure configuration document find where the keys are | |
res = requests.get(jwk_uri) | |
jwk_keys = res.json() | |
# Iterate JWK keys and extract matching x5c chain | |
x5c = Nonefor key in jwk_keys['keys']: | |
if key['kid'] == token_header['kid']: | |
x5c = key['x5c'] | |
# It is already base64 encoded, just wrap it as a certificate | |
cert = ''.join([ | |
'-----BEGIN CERTIFICATE-----\n', | |
x5c[0], | |
'\n-----END CERTIFICATE-----\n', | |
]) | |
return cert |
{ | |
"aud": "<your api application id>", | |
"iss": "https://sts.windows.net/651c54ee-ecc6-49af-87a5-b83e93bd1519/", | |
"iat": 1538249692, | |
"nbf": 1538249692, | |
"exp": 1538253592, | |
"aio": "42BgYEgSW5MV4i58UDPvAqPPJ3djAA==", | |
"appid": "cb974946-528a-4168-97c4-b4d11178f5dd", | |
"appidacr": "1", | |
"idp": "https://sts.windows.net/651c54ee-ecc6-49af-87a5-b83e93bd1519/", | |
"oid": "6f47c4af-865a-41a9-8838-3b481506c86c", | |
"sub": "6f47c4af-865a-41a9-8838-3b481506c86c", | |
"tid": "651c54ee-ecc6-49af-87a5-b83e93bd1519", | |
"uti": "SLQQHbUpLEqNYFSAsKZSAQ", | |
"ver": "1.0" | |
} |
# OAuth2 related settings | |
OUATH2_AUDIENCE = '<Your api application id>' | |
OAUTH2_SETTINGS_URL = 'https://login.microsoftonline.com/common/.well-known/openid-configuration' | |
# Caching | |
CACHES = { | |
'default': { | |
'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache', | |
'LOCATION': '127.0.0.1:11211', | |
} | |
} | |
# Register the custom middleware to verify the access token | |
MIDDLEWARE = [ | |
'django.middleware.security.SecurityMiddleware', | |
'api.middleware.checkaccesstoken.AccessTokenMiddleware', | |
'django.contrib.sessions.middleware.SessionMiddleware', | |
'django.middleware.common.CommonMiddleware', | |
'django.middleware.csrf.CsrfViewMiddleware', | |
'django.contrib.auth.middleware.AuthenticationMiddleware', | |
'django.contrib.messages.middleware.MessageMiddleware', | |
'django.middleware.clickjacking.XFrameOptionsMiddleware', | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment