Skip to content

Instantly share code, notes, and snippets.

@dmsimard dmsimard/reauth.py
Created Aug 20, 2019

Embed
What would you like to do?
reauth
import json
import re
from urllib import parse as urlparse
import requests
def _image_tag_from_url(image_url):
if '@' in image_url.path:
parts = image_url.path.split('@')
else:
parts = image_url.path.split(':')
tag = parts[-1]
image = ':'.join(parts[:-1])
return image, tag
def _build_url(url, path):
return '%s://%s%s' % (url.scheme, url.netloc, url.path)
def authenticate(image_url, username=None, password=None, session=None):
print("image_url: %s" % str(image_url))
netloc = image_url.netloc
image, tag = _image_tag_from_url(image_url)
url = _build_url(image_url, path='/')
print("url: %s" % str(url))
if not session:
session = requests.Session()
session.verify = False
r = session.get(url, timeout=30)
if r.status_code == 200:
return session
if r.status_code != 401:
r.raise_for_status()
if 'www-authenticate' not in r.headers:
raise ImageUploaderException(
'Unknown authentication method for headers: %s' % r.headers)
www_auth = r.headers['www-authenticate']
print('www-authenticate', www_auth)
if not www_auth.startswith('Bearer '):
raise ImageUploaderException(
'Unknown www-authenticate value: %s' % www_auth)
token_param = {'offline_token': 'true'}
realm = re.search('realm="(.*?)"', www_auth).group(1)
if 'service=' in www_auth:
token_param['service'] = re.search(
'service="(.*?)"', www_auth).group(1)
# token_param['scope'] = 'repository:%s:pull' % image[1:]
token_param['scope'] = 'repository:tripleomaster/centos-binary-nova-compute:pull'
auth = None
if username:
auth = requests_auth.HTTPBasicAuth(username, password)
# NOTE(cloudnull): look for errors in our return www_auth data.
# If any are found, purge the token for re-auth.
try:
re.search('error="(.*?)"', www_auth).group(1)
except AttributeError:
pass
else:
session.headers.pop('Authorization', None)
print(realm, token_param, auth)
rauth = session.get(realm, params=token_param, auth=auth, timeout=30)
print(rauth.url)
rauth.raise_for_status()
data = rauth.json()
session.headers['Authorization'] = 'Bearer %s' % data['token']
session.headers['issued_at'] = data['issued_at']
setattr(session, 'authenticate', authenticate)
setattr(
session,
'auth_args',
dict(
image_url=image_url,
username=username,
password=password,
session=session
)
)
return session
print("## AUTH 1")
url = urlparse.urlparse('http://mirror.bhs1.ovh.openstack.org:8082/v2/tripleomaster/centos-binary-nova-compute/blobs/sha256:2efded40b28a63edb701aef3f646be560c1d938199334b173f496db2ca7285b1')
auth = authenticate(image_url=url)
# The stored session headers
print(json.dumps(dict(auth.headers), indent=4))
print("## AUTH 2")
# reauth
auth.authenticate(**auth.auth_args)
# The stored session headers should be updated, if the token was expired
print(json.dumps(dict(auth.headers), indent=4))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.