Created
May 17, 2017 03:15
-
-
Save Hornswoggles/2ef7177aa8eb614d674ea9b9bf1be819 to your computer and use it in GitHub Desktop.
Mini web service with optional cert and jwt authentication and authorization
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This is a webservice that is implemented as a standalone twisted plugin. | |
All of the classes here could be separated out into distinct files but for | |
this example I find it easier to read if it's all together. | |
- Validates client certificate's | |
- Implements Twisted Cred to Authenticate Requests using JSON Web Tokens | |
TODO: | |
Create the Avatar Resource class that will perform authorization against our ACL | |
Setup: | |
Twisted plugins can be automatically discovered within a twisted/plugins directory | |
for any project in the python package path. | |
$ cd /opt/project | |
$ mkdir -p twisted/plugins | |
$ cd twisted/plugins | |
$ wget >> mini.py | |
Invoke this plugin using twistd: | |
$ twistd --pidfile jwtauth.pid -n jwtauth -p 4442 | |
NOTE: Twisted plugins have a cache that will be placed in the "twisted/plugins" | |
directly called "dropin.cache". If twistd cannot find the plugin then you may have | |
to delete this cache as well as all pyc files. | |
This will start the webservice on port 4442. Next construct your curl request: | |
sudo curl -vE /etc/certs/server.pem "https://localhost:4442/healthcheck" -XGET -H "Authorization: Bearer ..." | |
""" | |
# Fixtures Used | |
TOKEN_DATA = {u'aud': '00000000000000000000000000000001', u'iss': u'https://mytest.auth0.com/', u'mytest': {u'user_email': 'test_user@bob.com', u'my_ou': u'engineering'}, u'exp': 1448399133, u'iat': 1444677561, u'sub': u'samlp|test_user@bob.com'} | |
PRIVATE_KEY = '-----BEGIN RSA PRIVATE KEY-----\nSCRUBBED\n-----END RSA PRIVATE KEY-----' | |
# Expires in 50+ Years | |
VALID_TOKEN = 'SCRUBBED' | |
KEYS = [ | |
{ | |
'pubkey': '-----BEGIN PUBLIC KEY-----\nSCRUBBED\n-----END PUBLIC KEY-----', | |
'audience': '00000000000000000000000000000001', | |
} | |
] | |
ALGORITHMS = ['RS256'] | |
from zope.interface import implements | |
import jwt | |
import re | |
from twisted.python import usage, log | |
from twisted.plugin import IPlugin | |
from twisted.application import service, internet | |
from twisted.web.server import Site | |
from twisted.internet import ssl | |
from twisted.web.guard import HTTPAuthSessionWrapper | |
from twisted.web.resource import IResource | |
from twisted.web.iweb import ICredentialFactory | |
from twisted.cred.portal import IRealm, Portal | |
from twisted.cred.credentials import ICredentials | |
from twisted.cred.checkers import ICredentialsChecker | |
from twisted.cred.error import UnauthorizedLogin | |
from twisted.cred import error | |
class ServiceOptions(usage.Options): | |
optParameters = [ | |
["port", "p", 4442, "Server port number"], | |
] | |
class IBearerToken(ICredentials): | |
""" | |
IBearerToken credentials encapsulate a valid bearer token. | |
Parameters: | |
token The bearer token str | |
payload The decoded and valid token payload | |
""" | |
class JSONWebToken(object): | |
""" The BearerToken credential object that will be passed to a Checker | |
""" | |
implements(IBearerToken) | |
def __init__(self, token, payload): | |
self.token = token | |
self.payload = payload | |
class JSONWebTokenCredentialFactory(object): | |
implements(ICredentialFactory) | |
scheme = 'bearer' | |
def __init__(self, algorithms, keys): | |
""" | |
algorithms list. List of algorithms to check. | |
keys list. Auth0 keys | |
""" | |
self.algorithms = algorithms | |
self.keys = keys | |
@staticmethod | |
def getChallenge(address): | |
""" | |
Generate the challenge for use in the WWW-Authenticate header | |
@param request: The L{twisted.web.http.Request} | |
@return: The C{dict} that can be used to generate a WWW-Authenticate | |
header. | |
""" | |
# Implement http://self-issued.info/docs/draft-ietf-oauth-v2-bearer.html#authn-header | |
return {} | |
def decode(self, response, request): | |
""" Returns JSONWebToken (Credentials) | |
Create a JSONWebToken object from the given authentication_str. | |
response str. The bearer str minus the scheme | |
request obj. Ignored for now. Should be used to communicate failures | |
See: http://self-issued.info/docs/draft-ietf-oauth-v2-bearer.html#authn-header | |
""" | |
# Validate Bearer http://self-issued.info/docs/draft-ietf-oauth-v2-bearer.html#authn-header | |
payload = None | |
for key in self.keys: | |
try: | |
# We use an asymmetric signing algorithm so the "alg" key on the JWT cannot be trusted | |
# Specifying the algorithm is required and should never be set to None. | |
# Read this article before altering how JWT's are validated: | |
# https://auth0.com/blog/2015/03/31/critical-vulnerabilities-in-json-web-token-libraries/ | |
payload = jwt.decode( | |
response, | |
key=key['pubkey'], | |
audience=key['audience'], | |
options={ | |
'require_aud': True, | |
'require_exp': True, | |
}, | |
algorithms=self.algorithms, | |
) | |
except (ValueError, jwt.exceptions.DecodeError, jwt.exceptions.InvalidAudience): | |
# This key/audience doesn't match, let's try next in a queue | |
continue | |
except jwt.exceptions.MissingRequiredClaimError as e: | |
raise UnauthorizedLogin('Missing require claim on JSON Web Token: {0}'.format(e)) | |
except jwt.InvalidTokenError as e: | |
raise UnauthorizedLogin('Invalid or expired JSON Web Token: {0}'.format(e)) | |
if not payload: | |
raise UnauthorizedLogin('No keys matching JSON Web Token') | |
return JSONWebToken(token=response, payload=payload) | |
class JSONWebTokenChecker(object): | |
implements(ICredentialsChecker) | |
credentialInterfaces = [IBearerToken] | |
def requestAvatarId(self, credentials): | |
""" | |
Portal will call this method if a BearerToken credential is provided. | |
Check the given credentials and return a suitable user identifier if | |
they are valid. | |
credentials JSONWebToken. A IBearerToken object containing a decoded token | |
""" | |
if IBearerToken.providedBy(credentials): | |
# For now were overloading the avatarId with the full token payload | |
return credentials.payload | |
raise NotImplementedError() | |
from klein import Klein | |
class HealthCheckAPI(object): | |
app = Klein() | |
@app.route('/healthcheck', methods=['GET']) | |
def healthcheck(self, request): | |
return 'OK' | |
api = HealthCheckAPI() | |
resource = api.app.resource | |
class MyRealm(object): | |
implements(IRealm) | |
def requestAvatar(self, avatar_info, mind, *interfaces): | |
""" | |
This function should return an "Avatar" that MUST implement the IResource | |
interface. | |
We now have a fully authenticated request and have an id for the "Avatar". | |
An "Avatar" could be a user, a server, a organization or anything else | |
we want to define as an Avatar. | |
If we issued capability based JWT's the "Resource" being requested by the | |
avatar would be transparent and included within the JWT. However, for now we | |
have to rely on our routing layer. | |
avatar_info dict. For now its whats returned by the JSONWebTokenChecker. | |
mind None. Not used and ignored | |
interfaces list. Interfaces this realm returns | |
""" | |
# Make it explicit that this Realm only supports IResources since it is | |
# being wrapped for HTTP authenticaiton | |
if IResource in interfaces: | |
logout_callback = lambda: None | |
# The HTTPAuthSessionWrapper is expecting any IResource to be returned from | |
# the realm. http://twistedmatrix.com/trac/browser/tags/releases/twisted-12.2.0/twisted/web/resource.py | |
# The IResource we return here could perform Authorization duties | |
# and routing and eventually return the proper web resource. However | |
# for now I'm just returning the root web_resource. I.E. There is no | |
# ACL or Authorization mechanism in this example. | |
# TODO: Return an Actor that implements IResource | |
return (IResource, resource(), logout_callback) | |
raise NotImplementedError() | |
def verifyPeerCallback(connection, x509, errnum, errdepth, ok): | |
""" | |
The responsibility of this callback function is to verify the peer certificate's | |
validitity. Resist the urge to include "authentication" or "authorization" | |
assertion's inside this function. Use pluggable Twisted Guard to perform | |
HTTP authentication and Twisted Cred to add an Access Control Layer. | |
This callback will be called once for every level of a certificate. | |
The errdepth count is "0:peer certificate", "1: CA certificate", | |
"2: higher level CA certificate", and so on. Setting the maximum depth to 2 | |
allows the levels 0, 1, and 2. The default depth limit is 100, allowing for | |
the peer certificate and additional 100 CA certificates. | |
""" | |
if ok: # cert passed validation | |
if errdepth > 0: # No further validation of CA cert or higher level CA cert | |
return True | |
# Do peer certificate OU validation | |
subject = x509.get_subject() | |
issuer = x509.get_issuer() | |
# Example of limiting access to this service by issuer and by OU | |
if issuer.CN == 'pki.server.com' and subject.OU in ("developer", "sales", "marketing"): | |
# Valid Cert | |
return True | |
return False | |
class ServiceMaker(object): | |
implements(service.IServiceMaker, IPlugin) | |
tapname = "jwtauth" | |
description = "This server has a healthcheck." | |
options = ServiceOptions | |
def makeService(self, options): | |
"""Construct a service object.""" | |
# The protocol factory is, like the realm, something you implement. | |
myContextFactory = ssl.DefaultOpenSSLContextFactory( | |
'/etc/certs/server.key', | |
'/etc/certs/server.crt' | |
) | |
ctx = myContextFactory.getContext() | |
if hasattr(ctx, 'set_session_cache_mode'): | |
ctx.set_session_cache_mode(0) | |
# Require a peer cert | |
ctx.set_verify( | |
ssl.SSL.VERIFY_PEER | ssl.SSL.VERIFY_FAIL_IF_NO_PEER_CERT, verifyPeerCallback | |
) | |
ctx.set_options(ssl.SSL.OP_NO_SSLv2 | ssl.SSL.OP_NO_SSLv3) | |
# Since we have self-signed certs we have to explicitly | |
# tell the server to trust them. | |
ctx.load_verify_locations("/etc/certs/ca.crt") | |
# Realm: retrieves the resource for an authenticated avatarId | |
realm = MyRealm() | |
# Checkers: Validates the credentials and returns the avatar | |
checkers = [JSONWebTokenChecker()] | |
# Portal: The authentication gateway | |
portal = Portal(realm, checkers) | |
# Credential Factory: Produces credentials | |
credential_factories = [JSONWebTokenCredentialFactory(ALGORITHMS, KEYS)] | |
# HTTPAuthSessionWrapper: The logic to support HTTP Authentication schemes | |
resource = HTTPAuthSessionWrapper(portal, credential_factories) | |
# The HTTPFactory encapsulates the logic for managing the site sessions and resources | |
site = Site(resource) | |
# Finally, return a service that will listen for connections. | |
return internet.SSLServer(options["port"], site, myContextFactory, interface='::') | |
# As in our example above, we have to construct an object that | |
# provides the IPlugin and IServiceMaker interfaces. | |
serviceMaker = ServiceMaker() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment