Skip to content

Instantly share code, notes, and snippets.

@andybeak andybeak/access_token.py
Last active Sep 29, 2018

Embed
What would you like to do?
LinkedIn article on Django and Azure
def get_access_token(self, request):
authorization_header = request.META['HTTP_AUTHORIZATION']
bearer, _, token = authorization_header.partition(' ')
if bearer != self.AUTH_TYPE_PREFIX:
raise ValueError('Invalid token')
return token
{
"token_type": "Bearer",
"expires_in": "3599",
"ext_expires_in": "0",
"expires_on": "1538253592",
"not_before": "1538249692",
"resource": "2f50eb0f-b4f7-4a33-83d6-2223f3a7c61a",
"access_token": "JWT token"
}
def __call__(self, request):# Code to be executed for each request before# the view (and later middleware) are called.try:
access_token = self.get_access_token(request)
self.check_access_token(access_token)
response = self.get_response(request)
except (jwt.DecodeError, ValueError) as e:
response = HttpResponse('Unauthorized', status=401)
except Exception as e:
response = HttpResponse('Unauthorized', status=401)
# Code to be executed for each request/response after# the view is called.
return response
def check_access_token(self, access_token):
token_header = jwt.get_unverified_header(access_token)
public_key = self.get_public_key(token_header)
jwt.decode(
access_token,
public_key,
algorithms=token_header['alg'],
audience=settings.OUATH2_AUDIENCE,
)
from django.conf import settings
from django.http import HttpResponse
from django.core.cache import cache
import jwt
import requests
from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.backends import default_backend
class AccessTokenMiddleware:
AUTH_TYPE_PREFIX = 'Bearer'
PUBLIC_KEY_CACHEKEY = 'public_key'
TWELVE_HOURS = 43200
def __init__(self, get_response):
self.get_response = get_response
# One-time configuration and initialization.
def __call__(self, request):
# Code to be executed for each request before
# the view (and later middleware) are called.
try:
access_token = self.get_access_token(request)
self.check_access_token(access_token)
response = self.get_response(request)
except (jwt.DecodeError, ValueError) as e:
response = HttpResponse('Unauthorized', status=401)
except Exception as e:
response = HttpResponse('Unauthorized [' + str(e) + ']', status=401)
# Code to be executed for each request/response after
# the view is called.
return response
def get_access_token(self, request):
authorization_header = request.META['HTTP_AUTHORIZATION']
bearer, _, token = authorization_header.partition(' ')
if bearer != self.AUTH_TYPE_PREFIX:
raise ValueError('Invalid token')
return token
def check_access_token(self, access_token):
token_header = jwt.get_unverified_header(access_token)
public_key = self.get_public_key(token_header)
jwt.decode(
access_token,
public_key,
algorithms=token_header['alg'],
audience=settings.OUATH2_AUDIENCE,
)
def get_public_key(self, token_header):
cert = cache.get(self.PUBLIC_KEY_CACHEKEY)
if cert is None:
cert = self.fetch_azure_signature(token_header)
cache.set(self.PUBLIC_KEY_CACHEKEY, cert, self.TWELVE_HOURS)
public_key = load_pem_x509_certificate(cert.encode(), default_backend()).public_key()
return public_key
def fetch_azure_signature(self, token_header):
# Fetch Azure OpenID configuration document
res = requests.get(settings.OAUTH2_SETTINGS_URL)
jwk_uri = res.json()['jwks_uri']
# From Azure configuration document find where the keys are
res = requests.get(jwk_uri)
jwk_keys = res.json()
# Iterate JWK keys and extract matching x5c chain
x5c = None
for key in jwk_keys['keys']:
if key['kid'] == token_header['kid']:
x5c = key['x5c']
# It is already base64 encoded, just wrap it as a certificate
cert = ''.join([
'-----BEGIN CERTIFICATE-----\n',
x5c[0],
'\n-----END CERTIFICATE-----\n',
])
return cert
curl -X POST \
https://login.microsoftonline.com/<tenant> id/oauth2/token \
-F grant_type=client_credentials \
-F client_id=<client application id> \
-F client_secret=<client secret key> \
-F resource=<server application id>
def get_public_key(self, token_header):
cert = cache.get(self.PUBLIC_KEY_CACHEKEY)
if cert is None:
cert = self.fetch_azure_signature(token_header)
cache.set(self.PUBLIC_KEY_CACHEKEY, cert, self.TWELVE_HOURS)
public_key = load_pem_x509_certificate(cert.encode(), default_backend()).public_key()
return public_key
def fetch_azure_signature(self, token_header):# Fetch Azure OpenID configuration document
res = requests.get(settings.OAUTH2_SETTINGS_URL)
jwk_uri = res.json()['jwks_uri']
# From Azure configuration document find where the keys are
res = requests.get(jwk_uri)
jwk_keys = res.json()
# Iterate JWK keys and extract matching x5c chain
x5c = Nonefor key in jwk_keys['keys']:
if key['kid'] == token_header['kid']:
x5c = key['x5c']
# It is already base64 encoded, just wrap it as a certificate
cert = ''.join([
'-----BEGIN CERTIFICATE-----\n',
x5c[0],
'\n-----END CERTIFICATE-----\n',
])
return cert
{
"aud": "<your api application id>",
"iss": "https://sts.windows.net/651c54ee-ecc6-49af-87a5-b83e93bd1519/",
"iat": 1538249692,
"nbf": 1538249692,
"exp": 1538253592,
"aio": "42BgYEgSW5MV4i58UDPvAqPPJ3djAA==",
"appid": "cb974946-528a-4168-97c4-b4d11178f5dd",
"appidacr": "1",
"idp": "https://sts.windows.net/651c54ee-ecc6-49af-87a5-b83e93bd1519/",
"oid": "6f47c4af-865a-41a9-8838-3b481506c86c",
"sub": "6f47c4af-865a-41a9-8838-3b481506c86c",
"tid": "651c54ee-ecc6-49af-87a5-b83e93bd1519",
"uti": "SLQQHbUpLEqNYFSAsKZSAQ",
"ver": "1.0"
}
# OAuth2 related settings
OUATH2_AUDIENCE = '<Your api application id>'
OAUTH2_SETTINGS_URL = 'https://login.microsoftonline.com/common/.well-known/openid-configuration'
# Caching
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
'LOCATION': '127.0.0.1:11211',
}
}
# Register the custom middleware to verify the access token
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'api.middleware.checkaccesstoken.AccessTokenMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.