Skip to content

Instantly share code, notes, and snippets.

@valdisrigdon
Created June 1, 2020 20:48
Show Gist options
  • Save valdisrigdon/92fb6de357ef47f9bcfabb59562ad8f4 to your computer and use it in GitHub Desktop.
Save valdisrigdon/92fb6de357ef47f9bcfabb59562ad8f4 to your computer and use it in GitHub Desktop.
import jwt
import base64
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
def ensure_bytes(key):
if isinstance(key, str):
key = key.encode('utf-8')
return key
def decode_value(val):
decoded = base64.urlsafe_b64decode(ensure_bytes(val) + b'==')
return int.from_bytes(decoded, 'big')
def rsa_pem_from_jwk(jwk):
return RSAPublicNumbers(
n=decode_value(jwk['n']),
e=decode_value(jwk['e'])
).public_key(default_backend()).public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# obtain jwks as you wish: configuration file, HTTP GET request to the endpoint returning them;
jwks = {
"keys": [
{
"kty": "RSA",
"kid": "kewiQq9jiC84CvSsJYOB-N6A8WFLSV20Mb-y7IlWDSQ",
"e": "AQAB",
"n": "5RyvCSgBoOGNE03CMcJ9Bzo1JDvsU8XgddvRuJtdJAIq5zJ8fiUEGCnMfAZI4of36YXBuBalIycqkgxrRkSOENRUCWN45bf8xsQCcQ8zZxozu0St4w5S-aC7N7UTTarPZTp4BZH8ttUm-VnK4aEdMx9L3Izo0hxaJ135undTuA6gQpK-0nVsm6tRVq4akDe3OhC-7b2h6z7GWJX1SD4sAD3iaq4LZa8y1mvBBz6AIM9co8R-vU1_CduxKQc3KxCnqKALbEKXm0mTGsXha9aNv3pLNRNs_J-cCjBpb1EXAe_7qOURTiIHdv8_sdjcFTJ0OTeLWywuSf7mD0Wpx2LKcD6ImENbyq5IBuR1e2ghnh5Y9H33cuQ0FRni8ikq5W3xP3HSMfwlayhIAJN_WnmbhENRU-m2_hDPiD9JYF2CrQneLkE3kcazSdtarPbg9ZDiydHbKWCV-X7HxxIKEr9N7P1V5HKatF4ZUrG60e3eBnRyccPwmT66i9NYyrcy1_ZNN8D1DY8xh9kflUDy4dSYu4R7AEWxNJWQQov525v0MjD5FNAS03rpk4SuW3Mt7IP73m-_BpmIhW3LZsnmfd8xHRjf0M9veyJD0--ETGmh8t3_CXh3I3R9IbcSEntUl_2lCvc_6B-m8W-t2nZr4wvOq9-iaTQXAn1Au6EaOYWvDRE",
"use": "sig",
"alg": "RS256"
}
]
}
issuer = 'gitlab.com' # iss
class InvalidAuthorizationToken(Exception):
def __init__(self, details):
super().__init__('Invalid authorization token: ' + details)
def get_kid(token):
headers = jwt.get_unverified_header(token)
if not headers:
raise InvalidAuthorizationToken('missing headers')
try:
return headers['kid']
except KeyError:
raise InvalidAuthorizationToken('missing kid')
def get_jwk(kid):
for jwk in jwks.get('keys'):
if jwk.get('kid') == kid:
return jwk
raise InvalidAuthorizationToken('kid not recognized')
def get_public_key(token):
return rsa_pem_from_jwk(get_jwk(get_kid(token)))
def validate_jwt(jwt_to_validate):
public_key = get_public_key(jwt_to_validate)
decoded = jwt.decode(jwt_to_validate,
public_key,
verify=True,
algorithms=['RS256'],
leeway=86400,
issuer=issuer
)
print(decoded)
print(decoded['project_path']) #/appian/prod/clusters
def main():
import sys
import traceback
if len(sys.argv) < 2:
print('Please provide a JWT as script argument')
return
jwt = sys.argv[1]
if not jwt:
print('Please pass a valid JWT')
try:
validate_jwt(jwt)
except Exception:
traceback.print_exc()
print(f'The JWT is not valid!')
else:
print('The JWT is valid!')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment