Last active
January 3, 2016 11:09
-
-
Save moskrc/8453934 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib | |
import jwt | |
import hashlib | |
import datetime | |
import calendar | |
import requests | |
import urlparse | |
import httplib | |
from addon.models import JWT | |
from requests.auth import AuthBase | |
httplib.HTTPConnection.debuglevel = 1 | |
class JwtAuth(AuthBase): | |
def __init__(self, jwt_token): | |
self.jwt_token = jwt_token | |
def __call__(self, r): | |
r.headers['Authorization'] = u'JWT %s' % self.jwt_token | |
r.headers['Content-Type'] = 'application/json;charset=UTF-8' | |
return r | |
def create_canonical_request(method, url): | |
print u'-- Call: create_canonical_request(%s, %s)' % (method, url) | |
parsed_query = urlparse.parse_qs(urlparse.urlparse(url).query) | |
params = parsed_query.items() | |
params.sort() | |
canonical_request = u'' | |
for p in params: | |
if p[0] not in [u'jwt', ]: | |
param_values = p[1] | |
_res = [] | |
for pv in param_values: | |
_res.append(urllib.quote_plus(pv).replace(u'+', u'%20').replace(u'%7E', u'~')) | |
_res.sort() | |
canonical_request += u'&%s=%s' % (p[0], ','.join(_res)) | |
if not params: | |
canonical_request = u'&' | |
qsh = u'%(method)s&%(path)s%(canonical_request)s' % {'method': method.upper(), | |
'path': urlparse.urlparse(url).path.rstrip('/') or '/', | |
'canonical_request': canonical_request} | |
print u'-- Canonical request (sorted): %s' % qsh | |
return qsh | |
def create_query_string_hash(method, url): | |
return hashlib.sha256(create_canonical_request(method, url)).hexdigest() | |
def send_jira_request(r, url, method, data={}): | |
# find the client_key in the database by 'iss' from atlassian JWT | |
jwt_decoded = jwt.decode(r.GET.get('jwt'), verify=False) | |
print u'-- JWT from the request header: %s' % jwt_decoded | |
jwt_model = JWT.objects.get(client_key=jwt_decoded['iss']) | |
# url for request | |
full_url = jwt_model.base_url + url | |
print u'-- Full Url: %s' % full_url | |
# verify QSH from Atlassian | |
print u'-- Verify QSH from JWT token' | |
calculated_qsh = create_query_string_hash(r.META['REQUEST_METHOD'], r.get_full_path()) | |
print u'-- QSH from JWT and calculated: %s and %s' % (jwt_decoded['qsh'], calculated_qsh) | |
if jwt_decoded['qsh'] != calculated_qsh: | |
print u'!!-- ERROR, THEY ARE NOT IDENTICAL' | |
else: | |
print u'-- SUCCESS, THEY ARE IDENTICAL!' | |
# try to decode with secret key | |
try: | |
jwt.decode(r.GET.get('jwt'), jwt_model.shared_secret) | |
except Exception as e: | |
print u'!! JWT DECODE ERROR' | |
else: | |
print u'-- JWT WAS SUCCESSFULLY VERIFIED!' | |
# make a qsh | |
qsh = create_query_string_hash(method, full_url) | |
print u'-- QSH (sha256): %s' % qsh | |
# make a payload for JWT | |
iat = datetime.datetime.utcnow() | |
exp = iat + datetime.timedelta(minutes=3) | |
jwt_payload = { | |
u'iss': jwt_model.client_key, | |
u'iat': calendar.timegm(iat.utctimetuple()), | |
u'exp': calendar.timegm(exp.utctimetuple()), | |
u'qsh': qsh, | |
u'sub': jwt_decoded['sub'] | |
} | |
print u'-- Making Payload: %s' % jwt_payload | |
# make a JWT with the payload | |
new_jwt = jwt.encode(jwt_payload, jwt_model.shared_secret) | |
print u'-- New JWT: %s' % new_jwt | |
print u'-- Try to send request' | |
if method == 'GET': | |
print '-- ' * 30 | |
r = requests.get(full_url, params={'jwt': new_jwt}, auth=JwtAuth(new_jwt)) | |
print '-- ' * 30 | |
elif method in ['POST', 'PUT']: | |
r = requests.post(full_url, params={'jwt': new_jwt}, auth=JwtAuth(new_jwt), data=data) | |
else: | |
raise Exception('Unimplemented') | |
print 'TRY TO SEND REQUEST' | |
print '-------------------' | |
print '\n' | |
print send_jira_request(request, '/rest/api/2/project', 'GET', {}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment