Last active
December 15, 2017 15:44
-
-
Save giffels/c705666235a452f8db57ef0af215ec31 to your computer and use it in GitHub Desktop.
REST client for services hosted on cmsweb.cern.ch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import requests | |
from requests.exceptions import HTTPError | |
#disable ssl ca verification errors | |
requests.packages.urllib3.disable_warnings() | |
class RESTClient(object): | |
_requests_client_session = None | |
def __init__(self, certificate, key, default_headers=None): | |
if not self._requests_client_session: | |
self._requests_client_session = requests.Session() | |
self.certificate = certificate | |
self.key = key | |
if default_headers: | |
self.headers= default_headers | |
else: | |
self.headers = {} | |
def _raise_for_status(self, response): | |
""" | |
checks for status not ok and raises corresponding http error | |
""" | |
try: | |
response.raise_for_status() | |
except HTTPError as http_error: | |
raise HTTPError("Server response: %s\nInfos: %s" % (http_error.message.encode('utf-8'), response.text.encode('utf-8'))) | |
def _request(self, request_func, url, api=None, headers=None, **kwargs): | |
request_headers = {} | |
request_headers.update(self.headers) | |
if headers: | |
request_headers.update(headers) | |
if api: | |
url = '%s/%s' % (url, api) | |
response = request_func(url=url, verify=False, | |
cert=(self.certificate, self.key), headers=request_headers, **kwargs) | |
self._raise_for_status(response) | |
return response.text | |
def get(self, url, api=None, headers=None, params=None): | |
return self._request(self._requests_client_session.get, url, api, headers, | |
params = params) | |
def post(self, url, api=None, headers=None, data=None): | |
return self._request(self._requests_client_session.post, url, api, headers, | |
data = data) | |
def put(self, url, api=None, headers=None, data=None): | |
return self._request(self._requests_client_session.put, url, api, headers, | |
data = data) | |
def unflattenJSON(data): | |
"""Tranform input to unflatten JSON format""" | |
columns = data['desc']['columns'] | |
return [row2dict(columns, row) for row in data['result']] | |
def row2dict(columns, row): | |
"""Convert rows to dictionaries with column keys from description""" | |
robj = {} | |
for k, v in zip(columns, row): | |
robj.setdefault(k, v) | |
return robj | |
class BaseClient(object): | |
def __init__(self, **config): | |
for key, value in config.items(): | |
setattr(self, key, value) | |
self.rest_client = RESTClient(certificate=self.certificate, key=self.key, default_headers=self.default_headers) | |
def get_json(self, url, api, params): | |
return json.loads(self.rest_client.get(url=url, api=api, params=params)) | |
class DBSClient(BaseClient): | |
def __init__(self, **config): | |
super(self.__class__, self).__init__(**config) | |
def list_files(self, **kwargs): | |
return self.get_json(url=self.dbs_url, api='files', params=kwargs) | |
class PhEDExClient(BaseClient): | |
def __init__(self, **config): | |
super(self.__class__, self).__init__(**config) | |
def list_file_replicas(self, **kwargs): | |
return self.get_json(url=self.phedex_url, api='filereplicas', params=kwargs) | |
def get_node_usage(self, **kwargs): | |
return self.get_json(url=self.phedex_url, api='nodeusage', params=kwargs) | |
def get_request_list(self, **kwargs): | |
return self.get_json(url=self.phedex_url, api='requestlist', params=kwargs) | |
def get_subscriptions(self, **kwargs): | |
return self.get_json(url=self.phedex_url, api='subscriptions', params=kwargs) | |
def get_transfer_requests(self, **kwargs): | |
return self.get_json(url=self.phedex_url, api='transferrequests', params=kwargs) | |
class SiteDBClient(BaseClient): | |
def __init__(self, **config): | |
super(self.__class__, self).__init__(**config) | |
def user_to_dn(self, username): | |
return unflattenJSON(self.get_json(url=self.sitedb_url, api='people', params=dict(match=username))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment