Created
August 20, 2019 16:37
-
-
Save dmsimard/eea8812022cc9325bbf4c06e349f5705 to your computer and use it in GitHub Desktop.
reauth
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import re | |
from urllib import parse as urlparse | |
import requests | |
def _image_tag_from_url(image_url): | |
if '@' in image_url.path: | |
parts = image_url.path.split('@') | |
else: | |
parts = image_url.path.split(':') | |
tag = parts[-1] | |
image = ':'.join(parts[:-1]) | |
return image, tag | |
def _build_url(url, path): | |
return '%s://%s%s' % (url.scheme, url.netloc, url.path) | |
def authenticate(image_url, username=None, password=None, session=None): | |
print("image_url: %s" % str(image_url)) | |
netloc = image_url.netloc | |
image, tag = _image_tag_from_url(image_url) | |
url = _build_url(image_url, path='/') | |
print("url: %s" % str(url)) | |
if not session: | |
session = requests.Session() | |
session.verify = False | |
r = session.get(url, timeout=30) | |
if r.status_code == 200: | |
return session | |
if r.status_code != 401: | |
r.raise_for_status() | |
if 'www-authenticate' not in r.headers: | |
raise ImageUploaderException( | |
'Unknown authentication method for headers: %s' % r.headers) | |
www_auth = r.headers['www-authenticate'] | |
print('www-authenticate', www_auth) | |
if not www_auth.startswith('Bearer '): | |
raise ImageUploaderException( | |
'Unknown www-authenticate value: %s' % www_auth) | |
token_param = {'offline_token': 'true'} | |
realm = re.search('realm="(.*?)"', www_auth).group(1) | |
if 'service=' in www_auth: | |
token_param['service'] = re.search( | |
'service="(.*?)"', www_auth).group(1) | |
# token_param['scope'] = 'repository:%s:pull' % image[1:] | |
token_param['scope'] = 'repository:tripleomaster/centos-binary-nova-compute:pull' | |
auth = None | |
if username: | |
auth = requests_auth.HTTPBasicAuth(username, password) | |
# NOTE(cloudnull): look for errors in our return www_auth data. | |
# If any are found, purge the token for re-auth. | |
try: | |
re.search('error="(.*?)"', www_auth).group(1) | |
except AttributeError: | |
pass | |
else: | |
session.headers.pop('Authorization', None) | |
print(realm, token_param, auth) | |
rauth = session.get(realm, params=token_param, auth=auth, timeout=30) | |
print(rauth.url) | |
rauth.raise_for_status() | |
data = rauth.json() | |
session.headers['Authorization'] = 'Bearer %s' % data['token'] | |
session.headers['issued_at'] = data['issued_at'] | |
setattr(session, 'authenticate', authenticate) | |
setattr( | |
session, | |
'auth_args', | |
dict( | |
image_url=image_url, | |
username=username, | |
password=password, | |
session=session | |
) | |
) | |
return session | |
print("## AUTH 1") | |
url = urlparse.urlparse('http://mirror.bhs1.ovh.openstack.org:8082/v2/tripleomaster/centos-binary-nova-compute/blobs/sha256:2efded40b28a63edb701aef3f646be560c1d938199334b173f496db2ca7285b1') | |
auth = authenticate(image_url=url) | |
# The stored session headers | |
print(json.dumps(dict(auth.headers), indent=4)) | |
print("## AUTH 2") | |
# reauth | |
auth.authenticate(**auth.auth_args) | |
# The stored session headers should be updated, if the token was expired | |
print(json.dumps(dict(auth.headers), indent=4)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment