Skip to content

Instantly share code, notes, and snippets.

@Hornswoggles
Created May 17, 2017 03:15
Show Gist options
  • Save Hornswoggles/2ef7177aa8eb614d674ea9b9bf1be819 to your computer and use it in GitHub Desktop.
Save Hornswoggles/2ef7177aa8eb614d674ea9b9bf1be819 to your computer and use it in GitHub Desktop.
Mini web service with optional cert and jwt authentication and authorization
"""
This is a webservice that is implemented as a standalone twisted plugin.
All of the classes here could be separated out into distinct files but for
this example I find it easier to read if it's all together.
- Validates client certificate's
- Implements Twisted Cred to Authenticate Requests using JSON Web Tokens
TODO:
Create the Avatar Resource class that will perform authorization against our ACL
Setup:
Twisted plugins can be automatically discovered within a twisted/plugins directory
for any project in the python package path.
$ cd /opt/project
$ mkdir -p twisted/plugins
$ cd twisted/plugins
$ wget >> mini.py
Invoke this plugin using twistd:
$ twistd --pidfile jwtauth.pid -n jwtauth -p 4442
NOTE: Twisted plugins have a cache that will be placed in the "twisted/plugins"
directly called "dropin.cache". If twistd cannot find the plugin then you may have
to delete this cache as well as all pyc files.
This will start the webservice on port 4442. Next construct your curl request:
sudo curl -vE /etc/certs/server.pem "https://localhost:4442/healthcheck" -XGET -H "Authorization: Bearer ..."
"""
# Fixtures Used
TOKEN_DATA = {u'aud': '00000000000000000000000000000001', u'iss': u'https://mytest.auth0.com/', u'mytest': {u'user_email': 'test_user@bob.com', u'my_ou': u'engineering'}, u'exp': 1448399133, u'iat': 1444677561, u'sub': u'samlp|test_user@bob.com'}
PRIVATE_KEY = '-----BEGIN RSA PRIVATE KEY-----\nSCRUBBED\n-----END RSA PRIVATE KEY-----'
# Expires in 50+ Years
VALID_TOKEN = 'SCRUBBED'
KEYS = [
{
'pubkey': '-----BEGIN PUBLIC KEY-----\nSCRUBBED\n-----END PUBLIC KEY-----',
'audience': '00000000000000000000000000000001',
}
]
ALGORITHMS = ['RS256']
from zope.interface import implements
import jwt
import re
from twisted.python import usage, log
from twisted.plugin import IPlugin
from twisted.application import service, internet
from twisted.web.server import Site
from twisted.internet import ssl
from twisted.web.guard import HTTPAuthSessionWrapper
from twisted.web.resource import IResource
from twisted.web.iweb import ICredentialFactory
from twisted.cred.portal import IRealm, Portal
from twisted.cred.credentials import ICredentials
from twisted.cred.checkers import ICredentialsChecker
from twisted.cred.error import UnauthorizedLogin
from twisted.cred import error
class ServiceOptions(usage.Options):
optParameters = [
["port", "p", 4442, "Server port number"],
]
class IBearerToken(ICredentials):
"""
IBearerToken credentials encapsulate a valid bearer token.
Parameters:
token The bearer token str
payload The decoded and valid token payload
"""
class JSONWebToken(object):
""" The BearerToken credential object that will be passed to a Checker
"""
implements(IBearerToken)
def __init__(self, token, payload):
self.token = token
self.payload = payload
class JSONWebTokenCredentialFactory(object):
implements(ICredentialFactory)
scheme = 'bearer'
def __init__(self, algorithms, keys):
"""
algorithms list. List of algorithms to check.
keys list. Auth0 keys
"""
self.algorithms = algorithms
self.keys = keys
@staticmethod
def getChallenge(address):
"""
Generate the challenge for use in the WWW-Authenticate header
@param request: The L{twisted.web.http.Request}
@return: The C{dict} that can be used to generate a WWW-Authenticate
header.
"""
# Implement http://self-issued.info/docs/draft-ietf-oauth-v2-bearer.html#authn-header
return {}
def decode(self, response, request):
""" Returns JSONWebToken (Credentials)
Create a JSONWebToken object from the given authentication_str.
response str. The bearer str minus the scheme
request obj. Ignored for now. Should be used to communicate failures
See: http://self-issued.info/docs/draft-ietf-oauth-v2-bearer.html#authn-header
"""
# Validate Bearer http://self-issued.info/docs/draft-ietf-oauth-v2-bearer.html#authn-header
payload = None
for key in self.keys:
try:
# We use an asymmetric signing algorithm so the "alg" key on the JWT cannot be trusted
# Specifying the algorithm is required and should never be set to None.
# Read this article before altering how JWT's are validated:
# https://auth0.com/blog/2015/03/31/critical-vulnerabilities-in-json-web-token-libraries/
payload = jwt.decode(
response,
key=key['pubkey'],
audience=key['audience'],
options={
'require_aud': True,
'require_exp': True,
},
algorithms=self.algorithms,
)
except (ValueError, jwt.exceptions.DecodeError, jwt.exceptions.InvalidAudience):
# This key/audience doesn't match, let's try next in a queue
continue
except jwt.exceptions.MissingRequiredClaimError as e:
raise UnauthorizedLogin('Missing require claim on JSON Web Token: {0}'.format(e))
except jwt.InvalidTokenError as e:
raise UnauthorizedLogin('Invalid or expired JSON Web Token: {0}'.format(e))
if not payload:
raise UnauthorizedLogin('No keys matching JSON Web Token')
return JSONWebToken(token=response, payload=payload)
class JSONWebTokenChecker(object):
implements(ICredentialsChecker)
credentialInterfaces = [IBearerToken]
def requestAvatarId(self, credentials):
"""
Portal will call this method if a BearerToken credential is provided.
Check the given credentials and return a suitable user identifier if
they are valid.
credentials JSONWebToken. A IBearerToken object containing a decoded token
"""
if IBearerToken.providedBy(credentials):
# For now were overloading the avatarId with the full token payload
return credentials.payload
raise NotImplementedError()
from klein import Klein
class HealthCheckAPI(object):
app = Klein()
@app.route('/healthcheck', methods=['GET'])
def healthcheck(self, request):
return 'OK'
api = HealthCheckAPI()
resource = api.app.resource
class MyRealm(object):
implements(IRealm)
def requestAvatar(self, avatar_info, mind, *interfaces):
"""
This function should return an "Avatar" that MUST implement the IResource
interface.
We now have a fully authenticated request and have an id for the "Avatar".
An "Avatar" could be a user, a server, a organization or anything else
we want to define as an Avatar.
If we issued capability based JWT's the "Resource" being requested by the
avatar would be transparent and included within the JWT. However, for now we
have to rely on our routing layer.
avatar_info dict. For now its whats returned by the JSONWebTokenChecker.
mind None. Not used and ignored
interfaces list. Interfaces this realm returns
"""
# Make it explicit that this Realm only supports IResources since it is
# being wrapped for HTTP authenticaiton
if IResource in interfaces:
logout_callback = lambda: None
# The HTTPAuthSessionWrapper is expecting any IResource to be returned from
# the realm. http://twistedmatrix.com/trac/browser/tags/releases/twisted-12.2.0/twisted/web/resource.py
# The IResource we return here could perform Authorization duties
# and routing and eventually return the proper web resource. However
# for now I'm just returning the root web_resource. I.E. There is no
# ACL or Authorization mechanism in this example.
# TODO: Return an Actor that implements IResource
return (IResource, resource(), logout_callback)
raise NotImplementedError()
def verifyPeerCallback(connection, x509, errnum, errdepth, ok):
"""
The responsibility of this callback function is to verify the peer certificate's
validitity. Resist the urge to include "authentication" or "authorization"
assertion's inside this function. Use pluggable Twisted Guard to perform
HTTP authentication and Twisted Cred to add an Access Control Layer.
This callback will be called once for every level of a certificate.
The errdepth count is "0:peer certificate", "1: CA certificate",
"2: higher level CA certificate", and so on. Setting the maximum depth to 2
allows the levels 0, 1, and 2. The default depth limit is 100, allowing for
the peer certificate and additional 100 CA certificates.
"""
if ok: # cert passed validation
if errdepth > 0: # No further validation of CA cert or higher level CA cert
return True
# Do peer certificate OU validation
subject = x509.get_subject()
issuer = x509.get_issuer()
# Example of limiting access to this service by issuer and by OU
if issuer.CN == 'pki.server.com' and subject.OU in ("developer", "sales", "marketing"):
# Valid Cert
return True
return False
class ServiceMaker(object):
implements(service.IServiceMaker, IPlugin)
tapname = "jwtauth"
description = "This server has a healthcheck."
options = ServiceOptions
def makeService(self, options):
"""Construct a service object."""
# The protocol factory is, like the realm, something you implement.
myContextFactory = ssl.DefaultOpenSSLContextFactory(
'/etc/certs/server.key',
'/etc/certs/server.crt'
)
ctx = myContextFactory.getContext()
if hasattr(ctx, 'set_session_cache_mode'):
ctx.set_session_cache_mode(0)
# Require a peer cert
ctx.set_verify(
ssl.SSL.VERIFY_PEER | ssl.SSL.VERIFY_FAIL_IF_NO_PEER_CERT, verifyPeerCallback
)
ctx.set_options(ssl.SSL.OP_NO_SSLv2 | ssl.SSL.OP_NO_SSLv3)
# Since we have self-signed certs we have to explicitly
# tell the server to trust them.
ctx.load_verify_locations("/etc/certs/ca.crt")
# Realm: retrieves the resource for an authenticated avatarId
realm = MyRealm()
# Checkers: Validates the credentials and returns the avatar
checkers = [JSONWebTokenChecker()]
# Portal: The authentication gateway
portal = Portal(realm, checkers)
# Credential Factory: Produces credentials
credential_factories = [JSONWebTokenCredentialFactory(ALGORITHMS, KEYS)]
# HTTPAuthSessionWrapper: The logic to support HTTP Authentication schemes
resource = HTTPAuthSessionWrapper(portal, credential_factories)
# The HTTPFactory encapsulates the logic for managing the site sessions and resources
site = Site(resource)
# Finally, return a service that will listen for connections.
return internet.SSLServer(options["port"], site, myContextFactory, interface='::')
# As in our example above, we have to construct an object that
# provides the IPlugin and IServiceMaker interfaces.
serviceMaker = ServiceMaker()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment