LinkedIn article on Django and Azure
def get_access_token(self, request):
authorization_header = request.META['HTTP_AUTHORIZATION']
bearer, _, token = authorization_header.partition(' ')
if bearer != self.AUTH_TYPE_PREFIX:
raise ValueError('Invalid token')
return token
"token_type": "Bearer",
"expires_in": "3599",
"ext_expires_in": "0",
"expires_on": "1538253592",
"not_before": "1538249692",
"resource": "2f50eb0f-b4f7-4a33-83d6-2223f3a7c61a",
"access_token": "JWT token"
def __call__(self, request):# Code to be executed for each request before# the view (and later middleware) are called.try:
access_token = self.get_access_token(request)
response = self.get_response(request)
except (jwt.DecodeError, ValueError) as e:
response = HttpResponse('Unauthorized', status=401)
except Exception as e:
response = HttpResponse('Unauthorized', status=401)
# Code to be executed for each request/response after# the view is called.
return response
def check_access_token(self, access_token):
token_header = jwt.get_unverified_header(access_token)
public_key = self.get_public_key(token_header)
from django.conf import settings
from django.http import HttpResponse
from django.core.cache import cache
import jwt
import requests
from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.backends import default_backend
class AccessTokenMiddleware:
PUBLIC_KEY_CACHEKEY = 'public_key'
def __init__(self, get_response):
self.get_response = get_response
# One-time configuration and initialization.
def __call__(self, request):
# Code to be executed for each request before
# the view (and later middleware) are called.
access_token = self.get_access_token(request)
response = self.get_response(request)
except (jwt.DecodeError, ValueError) as e:
response = HttpResponse('Unauthorized', status=401)
except Exception as e:
response = HttpResponse('Unauthorized [' + str(e) + ']', status=401)
# Code to be executed for each request/response after
# the view is called.
return response
def get_access_token(self, request):
authorization_header = request.META['HTTP_AUTHORIZATION']
bearer, _, token = authorization_header.partition(' ')
if bearer != self.AUTH_TYPE_PREFIX:
raise ValueError('Invalid token')
return token
def check_access_token(self, access_token):
token_header = jwt.get_unverified_header(access_token)
public_key = self.get_public_key(token_header)
def get_public_key(self, token_header):
cert = cache.get(self.PUBLIC_KEY_CACHEKEY)
if cert is None:
cert = self.fetch_azure_signature(token_header)
cache.set(self.PUBLIC_KEY_CACHEKEY, cert, self.TWELVE_HOURS)
public_key = load_pem_x509_certificate(cert.encode(), default_backend()).public_key()
return public_key
def fetch_azure_signature(self, token_header):
# Fetch Azure OpenID configuration document
res = requests.get(settings.OAUTH2_SETTINGS_URL)
jwk_uri = res.json()['jwks_uri']
# From Azure configuration document find where the keys are
res = requests.get(jwk_uri)
jwk_keys = res.json()
# Iterate JWK keys and extract matching x5c chain
x5c = None
for key in jwk_keys['keys']:
if key['kid'] == token_header['kid']:
x5c = key['x5c']
# It is already base64 encoded, just wrap it as a certificate
cert = ''.join([
'-----BEGIN CERTIFICATE-----\n',
'\n-----END CERTIFICATE-----\n',
return cert
curl -X POST \<tenant> id/oauth2/token \
-F grant_type=client_credentials \
-F client_id=<client application id> \
-F client_secret=<client secret key> \
-F resource=<server application id>
def get_public_key(self, token_header):
cert = cache.get(self.PUBLIC_KEY_CACHEKEY)
if cert is None:
cert = self.fetch_azure_signature(token_header)
cache.set(self.PUBLIC_KEY_CACHEKEY, cert, self.TWELVE_HOURS)
public_key = load_pem_x509_certificate(cert.encode(), default_backend()).public_key()
return public_key
def fetch_azure_signature(self, token_header):# Fetch Azure OpenID configuration document
res = requests.get(settings.OAUTH2_SETTINGS_URL)
jwk_uri = res.json()['jwks_uri']
# From Azure configuration document find where the keys are
res = requests.get(jwk_uri)
jwk_keys = res.json()
# Iterate JWK keys and extract matching x5c chain
x5c = Nonefor key in jwk_keys['keys']:
if key['kid'] == token_header['kid']:
x5c = key['x5c']
# It is already base64 encoded, just wrap it as a certificate
cert = ''.join([
'-----BEGIN CERTIFICATE-----\n',
'\n-----END CERTIFICATE-----\n',
return cert
"aud": "<your api application id>",
"iss": "",
"iat": 1538249692,
"nbf": 1538249692,
"exp": 1538253592,
"aio": "42BgYEgSW5MV4i58UDPvAqPPJ3djAA==",
"appid": "cb974946-528a-4168-97c4-b4d11178f5dd",
"appidacr": "1",
"idp": "",
"oid": "6f47c4af-865a-41a9-8838-3b481506c86c",
"sub": "6f47c4af-865a-41a9-8838-3b481506c86c",
"tid": "651c54ee-ecc6-49af-87a5-b83e93bd1519",
"ver": "1.0"
# OAuth2 related settings
OUATH2_AUDIENCE = '<Your api application id>'
# Caching
'default': {
'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
# Register the custom middleware to verify the access token
