Created
January 17, 2019 17:38
-
-
Save mpatek/316cccf6ec15721c8f60343e3b5deed7 to your computer and use it in GitHub Desktop.
Adding a decode parameter to Swagger-generated Looker API Python SDK
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
""" | |
Copyright 2016 SmartBear Software | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | |
You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
ref: https://github.com/swagger-api/swagger-codegen | |
""" | |
from __future__ import absolute_import | |
from . import models | |
from .rest import RESTClientObject | |
from .rest import ApiException | |
import os | |
import re | |
import sys | |
import urllib | |
import json | |
import mimetypes | |
import random | |
import tempfile | |
import threading | |
from datetime import datetime | |
from datetime import date | |
from urllib.parse import quote | |
from .configuration import Configuration | |
class ApiClient(object): | |
""" | |
Generic API client for Swagger client library builds. | |
Swagger generic API client. This client handles the client- | |
server communication, and is invariant across implementations. Specifics of | |
the methods and models for each application are generated from the Swagger | |
templates. | |
NOTE: This class is auto generated by the swagger code generator program. | |
Ref: https://github.com/swagger-api/swagger-codegen | |
Do not edit the class manually. | |
:param host: The base path for the server to call. | |
:param header_name: a header to pass when making calls to the API. | |
:param header_value: a header value to pass when making calls to the API. | |
""" | |
def __init__(self, host=None, header_name=None, header_value=None, cookie=None): | |
""" | |
Constructor of the class. | |
""" | |
self.rest_client = RESTClientObject() | |
self.default_headers = {} | |
if header_name is not None: | |
self.default_headers[header_name] = header_value | |
if host is None: | |
self.host = Configuration().host | |
else: | |
self.host = host | |
self.cookie = cookie | |
# Set default User-Agent. | |
self.user_agent = 'Swagger-Codegen/1.0.0/python' | |
@property | |
def user_agent(self): | |
""" | |
Gets user agent. | |
""" | |
return self.default_headers['User-Agent'] | |
@user_agent.setter | |
def user_agent(self, value): | |
""" | |
Sets user agent. | |
""" | |
self.default_headers['User-Agent'] = value | |
def set_default_header(self, header_name, header_value): | |
self.default_headers[header_name] = header_value | |
def __call_api(self, resource_path, method, | |
path_params=None, query_params=None, header_params=None, | |
body=None, post_params=None, files=None, | |
response_type=None, auth_settings=None, callback=None, | |
decode=True): | |
# headers parameters | |
header_params = header_params or {} | |
header_params.update(self.default_headers) | |
if self.cookie: | |
header_params['Cookie'] = self.cookie | |
if header_params: | |
header_params = self.sanitize_for_serialization(header_params) | |
# path parameters | |
if path_params: | |
path_params = self.sanitize_for_serialization(path_params) | |
for k, v in path_params.items(): | |
replacement = quote(str(self.to_path_value(v))) | |
resource_path = resource_path.\ | |
replace('{' + k + '}', replacement) | |
# query parameters | |
if query_params: | |
query_params = self.sanitize_for_serialization(query_params) | |
query_params = {k: self.to_path_value(v) | |
for k, v in query_params.items()} | |
# post parameters | |
if post_params or files: | |
post_params = self.prepare_post_parameters(post_params, files) | |
post_params = self.sanitize_for_serialization(post_params) | |
# auth setting | |
self.update_params_for_auth(header_params, query_params, auth_settings) | |
# body | |
if body: | |
body = self.sanitize_for_serialization(body) | |
# request url | |
url = self.host + resource_path | |
# perform request and return response | |
response_data = self.request(method, url, | |
query_params=query_params, | |
headers=header_params, | |
post_params=post_params, body=body, | |
decode=decode) | |
self.last_response = response_data | |
# deserialize response data | |
if decode: | |
if response_type: | |
deserialized_data = self.deserialize(response_data, response_type) | |
else: | |
deserialized_data = None | |
else: | |
deserialized_data = response_data.data | |
if callback: | |
callback(deserialized_data) | |
else: | |
return deserialized_data | |
def to_path_value(self, obj): | |
""" | |
Takes value and turn it into a string suitable for inclusion in | |
the path, by url-encoding. | |
:param obj: object or string value. | |
:return string: quoted value. | |
""" | |
if type(obj) == list: | |
return ','.join(obj) | |
else: | |
return str(obj) | |
def sanitize_for_serialization(self, obj): | |
""" | |
Builds a JSON POST object. | |
If obj is None, return None. | |
If obj is str, int, float, bool, return directly. | |
If obj is datetime.datetime, datetime.date | |
convert to string in iso8601 format. | |
If obj is list, sanitize each element in the list. | |
If obj is dict, return the dict. | |
If obj is swagger model, return the properties dict. | |
:param obj: The data to serialize. | |
:return: The serialized form of data. | |
""" | |
types = (str, int, float, bool, tuple) | |
if isinstance(obj, type(None)): | |
return None | |
elif isinstance(obj, types): | |
return obj | |
elif isinstance(obj, list): | |
return [self.sanitize_for_serialization(sub_obj) | |
for sub_obj in obj] | |
elif isinstance(obj, (datetime, date)): | |
return obj.isoformat() | |
else: | |
if isinstance(obj, dict): | |
obj_dict = obj | |
else: | |
# Convert model obj to dict except | |
# attributes `swagger_types`, `attribute_map` | |
# and attributes which value is not None. | |
# Convert attribute name to json key in | |
# model definition for request. | |
obj_dict = {obj.attribute_map[attr]: getattr(obj, attr) | |
for attr, _ in obj.swagger_types.items() | |
if getattr(obj, attr) is not None} | |
return {key: self.sanitize_for_serialization(val) | |
for key, val in obj_dict.items()} | |
def deserialize(self, response, response_type): | |
""" | |
Deserializes response into an object. | |
:param response: RESTResponse object to be deserialized. | |
:param response_type: class literal for | |
deserialzied object, or string of class name. | |
:return: deserialized object. | |
""" | |
# handle file downloading | |
# save response body into a tmp file and return the instance | |
if "file" == response_type: | |
return self.__deserialize_file(response) | |
# fetch data from response object | |
try: | |
data = json.loads(response.data) | |
except ValueError: | |
data = response.data | |
return self.__deserialize(data, response_type) | |
def __deserialize(self, data, klass): | |
""" | |
Deserializes dict, list, str into an object. | |
:param data: dict, list or str. | |
:param klass: class literal, or string of class name. | |
:return: object. | |
""" | |
if data is None: | |
return None | |
if type(klass) == str: | |
if klass.startswith('list['): | |
sub_kls = re.match('list\[(.*)\]', klass).group(1) | |
return [self.__deserialize(sub_data, sub_kls) | |
for sub_data in data] | |
if klass.startswith('dict('): | |
sub_kls = re.match('dict\(([^,]*), (.*)\)', klass).group(2) | |
return {k: self.__deserialize(v, sub_kls) | |
for k, v in data.items()} | |
# convert str to class | |
# for native types | |
if klass in ['int', 'float', 'str', 'bool', | |
"date", 'datetime', "object"]: | |
klass = eval(klass) | |
# for model types | |
else: | |
klass = eval('models.' + klass) | |
if klass in [int, float, str, bool]: | |
return self.__deserialize_primitive(data, klass) | |
elif klass == object: | |
return self.__deserialize_object(data) | |
elif klass == date: | |
return self.__deserialize_date(data) | |
elif klass == datetime: | |
return self.__deserialize_datatime(data) | |
else: | |
return self.__deserialize_model(data, klass) | |
def call_api(self, resource_path, method, | |
path_params=None, query_params=None, header_params=None, | |
body=None, post_params=None, files=None, | |
response_type=None, auth_settings=None, callback=None, | |
decode=True): | |
""" | |
Makes the HTTP request (synchronous) and return the deserialized data. | |
To make an async request, define a function for callback. | |
:param resource_path: Path to method endpoint. | |
:param method: Method to call. | |
:param path_params: Path parameters in the url. | |
:param query_params: Query parameters in the url. | |
:param header_params: Header parameters to be | |
placed in the request header. | |
:param body: Request body. | |
:param post_params dict: Request post form parameters, | |
for `application/x-www-form-urlencoded`, `multipart/form-data`. | |
:param auth_settings list: Auth Settings names for the request. | |
:param response: Response data type. | |
:param files dict: key -> filename, value -> filepath, | |
for `multipart/form-data`. | |
:param callback function: Callback function for asynchronous request. | |
If provide this parameter, | |
the request will be called asynchronously. | |
:param decode: Whether to decode binary response data as text. Default | |
is True. | |
:return: | |
If provide parameter callback, | |
the request will be called asynchronously. | |
The method will return the request thread. | |
If parameter callback is None, | |
then the method will return the response directly. | |
""" | |
if callback is None: | |
return self.__call_api(resource_path, method, | |
path_params, query_params, header_params, | |
body, post_params, files, | |
response_type, auth_settings, callback, | |
decode) | |
else: | |
thread = threading.Thread(target=self.__call_api, | |
args=(resource_path, method, | |
path_params, query_params, | |
header_params, body, | |
post_params, files, | |
response_type, auth_settings, | |
callback, decode)) | |
thread.start() | |
return thread | |
def request(self, method, url, query_params=None, headers=None, | |
post_params=None, body=None, decode=True): | |
""" | |
Makes the HTTP request using RESTClient. | |
""" | |
if method == "GET": | |
return self.rest_client.GET( | |
url, | |
query_params=query_params, | |
headers=headers, | |
decode=decode, | |
) | |
elif method == "HEAD": | |
return self.rest_client.HEAD( | |
url, | |
query_params=query_params, | |
headers=headers, | |
decode=decode, | |
) | |
elif method == "OPTIONS": | |
return self.rest_client.OPTIONS( | |
url, | |
query_params=query_params, | |
headers=headers, | |
post_params=post_params, | |
body=body, | |
decode=decode, | |
) | |
elif method == "POST": | |
return self.rest_client.POST( | |
url, | |
query_params=query_params, | |
headers=headers, | |
post_params=post_params, | |
body=body, | |
decode=decode, | |
) | |
elif method == "PUT": | |
return self.rest_client.PUT( | |
url, | |
query_params=query_params, | |
headers=headers, | |
post_params=post_params, | |
body=body, | |
decode=decode, | |
) | |
elif method == "PATCH": | |
return self.rest_client.PATCH( | |
url, | |
query_params=query_params, | |
headers=headers, | |
post_params=post_params, | |
body=body, | |
decode=decode, | |
) | |
elif method == "DELETE": | |
return self.rest_client.DELETE( | |
url, | |
query_params=query_params, | |
headers=headers, | |
decode=decode, | |
) | |
else: | |
raise ValueError( | |
"http method must be `GET`, `HEAD`," | |
" `POST`, `PATCH`, `PUT` or `DELETE`." | |
) | |
def prepare_post_parameters(self, post_params=None, files=None): | |
""" | |
Builds form parameters. | |
:param post_params: Normal form parameters. | |
:param files: File parameters. | |
:return: Form parameters with files. | |
""" | |
params = [] | |
if post_params: | |
params = post_params | |
if files: | |
for k, v in files.items(): | |
if not v: | |
continue | |
file_names = v if type(v) is list else [v] | |
for n in file_names: | |
with open(n, 'rb') as f: | |
filename = os.path.basename(f.name) | |
filedata = f.read() | |
mimetype = mimetypes.\ | |
guess_type(filename)[0] or 'application/octet-stream' | |
params.append(tuple([k, tuple([filename, filedata, mimetype])])) | |
return params | |
def select_header_accept(self, accepts): | |
""" | |
Returns `Accept` based on an array of accepts provided. | |
:param accepts: List of headers. | |
:return: Accept (e.g. application/json). | |
""" | |
if not accepts: | |
return | |
accepts = list(map(lambda x: x.lower(), accepts)) | |
if 'application/json' in accepts: | |
return 'application/json' | |
else: | |
return ', '.join(accepts) | |
def select_header_content_type(self, content_types): | |
""" | |
Returns `Content-Type` based on an array of content_types provided. | |
:param content_types: List of content-types. | |
:return: Content-Type (e.g. application/json). | |
""" | |
if not content_types: | |
return 'application/json' | |
content_types = list(map(lambda x: x.lower(), content_types)) | |
if 'application/json' in content_types: | |
return 'application/json' | |
else: | |
return content_types[0] | |
def update_params_for_auth(self, headers, querys, auth_settings): | |
""" | |
Updates header and query params based on authentication setting. | |
:param headers: Header parameters dict to be updated. | |
:param querys: Query parameters dict to be updated. | |
:param auth_settings: Authentication setting identifiers list. | |
""" | |
config = Configuration() | |
if not auth_settings: | |
return | |
for auth in auth_settings: | |
auth_setting = config.auth_settings().get(auth) | |
if auth_setting: | |
if not auth_setting['value']: | |
continue | |
elif auth_setting['in'] == 'header': | |
headers[auth_setting['key']] = auth_setting['value'] | |
elif auth_setting['in'] == 'query': | |
querys[auth_setting['key']] = auth_setting['value'] | |
else: | |
raise ValueError( | |
'Authentication token must be in `query` or `header`' | |
) | |
def __deserialize_file(self, response): | |
""" | |
Saves response body into a file in a temporary folder, | |
using the filename from the `Content-Disposition` header if provided. | |
:param response: RESTResponse. | |
:return: file path. | |
""" | |
config = Configuration() | |
fd, path = tempfile.mkstemp(dir=config.temp_folder_path) | |
os.close(fd) | |
os.remove(path) | |
content_disposition = response.getheader("Content-Disposition") | |
if content_disposition: | |
filename = re.\ | |
search(r'filename=[\'"]?([^\'"\s]+)[\'"]?', content_disposition).\ | |
group(1) | |
path = os.path.join(os.path.dirname(path), filename) | |
with open(path, "w") as f: | |
f.write(response.data) | |
return path | |
def __deserialize_primitive(self, data, klass): | |
""" | |
Deserializes string to primitive type. | |
:param data: str. | |
:param klass: class literal. | |
:return: int, float, str, bool. | |
""" | |
try: | |
value = klass(data) | |
except UnicodeEncodeError: | |
value = str(data) | |
except TypeError: | |
value = data | |
return value | |
def __deserialize_object(self, value): | |
""" | |
Return a original value. | |
:return: object. | |
""" | |
return value | |
def __deserialize_date(self, string): | |
""" | |
Deserializes string to date. | |
:param string: str. | |
:return: date. | |
""" | |
try: | |
from dateutil.parser import parse | |
return parse(string).date() | |
except ImportError: | |
return string | |
except ValueError: | |
raise ApiException( | |
status=0, | |
reason="Failed to parse `{0}` into a date object" | |
.format(string) | |
) | |
def __deserialize_datatime(self, string): | |
""" | |
Deserializes string to datetime. | |
The string should be in iso8601 datetime format. | |
:param string: str. | |
:return: datetime. | |
""" | |
try: | |
from dateutil.parser import parse | |
return parse(string) | |
except ImportError: | |
return string | |
except ValueError: | |
raise ApiException( | |
status=0, | |
reason="Failed to parse `{0}` into a datetime object". | |
format(string) | |
) | |
def __deserialize_model(self, data, klass): | |
""" | |
Deserializes list or dict to model. | |
:param data: dict, list. | |
:param klass: class literal. | |
:return: model object. | |
""" | |
instance = klass() | |
for attr, attr_type in instance.swagger_types.items(): | |
if data is not None \ | |
and instance.attribute_map[attr] in data\ | |
and isinstance(data, (list, dict)): | |
value = data[instance.attribute_map[attr]] | |
setattr(instance, attr, self.__deserialize(value, attr_type)) | |
return instance |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
""" | |
RenderTaskApi.py | |
Copyright 2016 SmartBear Software | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | |
You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
""" | |
from __future__ import absolute_import | |
import sys | |
import os | |
# python 2 and python 3 compatibility library | |
from six import iteritems | |
from ..configuration import Configuration | |
from ..api_client import ApiClient | |
class RenderTaskApi(object): | |
""" | |
NOTE: This class is auto generated by the swagger code generator program. | |
Do not edit the class manually. | |
Ref: https://github.com/swagger-api/swagger-codegen | |
""" | |
def __init__(self, api_client=None): | |
config = Configuration() | |
if api_client: | |
self.api_client = api_client | |
else: | |
if not config.api_client: | |
config.api_client = ApiClient() | |
self.api_client = config.api_client | |
def create_dashboard_render_task(self, dashboard_id, result_format, body, width, height, **kwargs): | |
""" | |
Create Dashboard Render Task | |
### Create a new task to render a dashboard to a document or image.\n\nReturns a render task object.\nTo check the status of a render task, pass the render_task.id to [Get Render Task](#!/RenderTask/get_render_task).\nOnce the render task is complete, you can download the resulting document or image using [Get Render Task Results](#!/RenderTask/get_render_task_results).\n\n | |
This method makes a synchronous HTTP request by default. To make an | |
asynchronous HTTP request, please define a `callback` function | |
to be invoked when receiving the response. | |
>>> def callback_function(response): | |
>>> pprint(response) | |
>>> | |
>>> thread = api.create_dashboard_render_task(dashboard_id, result_format, body, width, height, callback=callback_function) | |
:param callback function: The callback function | |
for asynchronous request. (optional) | |
:param int dashboard_id: Id of dashboard to render (required) | |
:param str result_format: Output type: pdf, png, or jpg (required) | |
:param CreateDashboardRenderTask body: Dashboard render task parameters (required) | |
:param int width: Output width in pixels (required) | |
:param int height: Output height in pixels (required) | |
:param str fields: Requested fields. | |
:param str pdf_paper_size: Paper size for pdf | |
:param bool pdf_landscape: Whether to render pdf in landscape | |
:return: RenderTask | |
If the method is called asynchronously, | |
returns the request thread. | |
""" | |
all_params = ['dashboard_id', 'result_format', 'body', 'width', 'height', 'fields', 'pdf_paper_size', 'pdf_landscape'] | |
all_params.append('callback') | |
params = locals() | |
for key, val in iteritems(params['kwargs']): | |
if key not in all_params: | |
raise TypeError( | |
"Got an unexpected keyword argument '%s'" | |
" to method create_dashboard_render_task" % key | |
) | |
params[key] = val | |
del params['kwargs'] | |
# verify the required parameter 'dashboard_id' is set | |
if ('dashboard_id' not in params) or (params['dashboard_id'] is None): | |
raise ValueError("Missing the required parameter `dashboard_id` when calling `create_dashboard_render_task`") | |
# verify the required parameter 'result_format' is set | |
if ('result_format' not in params) or (params['result_format'] is None): | |
raise ValueError("Missing the required parameter `result_format` when calling `create_dashboard_render_task`") | |
# verify the required parameter 'body' is set | |
if ('body' not in params) or (params['body'] is None): | |
raise ValueError("Missing the required parameter `body` when calling `create_dashboard_render_task`") | |
# verify the required parameter 'width' is set | |
if ('width' not in params) or (params['width'] is None): | |
raise ValueError("Missing the required parameter `width` when calling `create_dashboard_render_task`") | |
# verify the required parameter 'height' is set | |
if ('height' not in params) or (params['height'] is None): | |
raise ValueError("Missing the required parameter `height` when calling `create_dashboard_render_task`") | |
resource_path = '/render_tasks/dashboards/{dashboard_id}/{result_format}'.replace('{format}', 'json') | |
path_params = {} | |
if 'dashboard_id' in params: | |
path_params['dashboard_id'] = params['dashboard_id'] | |
if 'result_format' in params: | |
path_params['result_format'] = params['result_format'] | |
query_params = {} | |
if 'width' in params: | |
query_params['width'] = params['width'] | |
if 'height' in params: | |
query_params['height'] = params['height'] | |
if 'fields' in params: | |
query_params['fields'] = params['fields'] | |
if 'pdf_paper_size' in params: | |
query_params['pdf_paper_size'] = params['pdf_paper_size'] | |
if 'pdf_landscape' in params: | |
query_params['pdf_landscape'] = params['pdf_landscape'] | |
header_params = {} | |
form_params = [] | |
local_var_files = {} | |
body_params = None | |
if 'body' in params: | |
body_params = params['body'] | |
# HTTP header `Accept` | |
header_params['Accept'] = self.api_client.\ | |
select_header_accept(['application/json']) | |
if not header_params['Accept']: | |
del header_params['Accept'] | |
# HTTP header `Content-Type` | |
header_params['Content-Type'] = self.api_client.\ | |
select_header_content_type(['application/json']) | |
# Authentication setting | |
auth_settings = [] | |
response = self.api_client.call_api(resource_path, 'POST', | |
path_params, | |
query_params, | |
header_params, | |
body=body_params, | |
post_params=form_params, | |
files=local_var_files, | |
response_type='RenderTask', | |
auth_settings=auth_settings, | |
callback=params.get('callback')) | |
return response | |
def create_look_render_task(self, look_id, result_format, width, height, **kwargs): | |
""" | |
Create Look Render Task | |
### Create a new task to render a look to an image.\n\nReturns a render task object.\nTo check the status of a render task, pass the render_task.id to [Get Render Task](#!/RenderTask/get_render_task).\nOnce the render task is complete, you can download the resulting document or image using [Get Render Task Results](#!/RenderTask/get_render_task_results).\n\n | |
This method makes a synchronous HTTP request by default. To make an | |
asynchronous HTTP request, please define a `callback` function | |
to be invoked when receiving the response. | |
>>> def callback_function(response): | |
>>> pprint(response) | |
>>> | |
>>> thread = api.create_look_render_task(look_id, result_format, width, height, callback=callback_function) | |
:param callback function: The callback function | |
for asynchronous request. (optional) | |
:param int look_id: Id of look to render (required) | |
:param str result_format: Output type: png, or jpg (required) | |
:param int width: Output width in pixels (required) | |
:param int height: Output height in pixels (required) | |
:param str fields: Requested fields. | |
:return: RenderTask | |
If the method is called asynchronously, | |
returns the request thread. | |
""" | |
all_params = ['look_id', 'result_format', 'width', 'height', 'fields'] | |
all_params.append('callback') | |
params = locals() | |
for key, val in iteritems(params['kwargs']): | |
if key not in all_params: | |
raise TypeError( | |
"Got an unexpected keyword argument '%s'" | |
" to method create_look_render_task" % key | |
) | |
params[key] = val | |
del params['kwargs'] | |
# verify the required parameter 'look_id' is set | |
if ('look_id' not in params) or (params['look_id'] is None): | |
raise ValueError("Missing the required parameter `look_id` when calling `create_look_render_task`") | |
# verify the required parameter 'result_format' is set | |
if ('result_format' not in params) or (params['result_format'] is None): | |
raise ValueError("Missing the required parameter `result_format` when calling `create_look_render_task`") | |
# verify the required parameter 'width' is set | |
if ('width' not in params) or (params['width'] is None): | |
raise ValueError("Missing the required parameter `width` when calling `create_look_render_task`") | |
# verify the required parameter 'height' is set | |
if ('height' not in params) or (params['height'] is None): | |
raise ValueError("Missing the required parameter `height` when calling `create_look_render_task`") | |
resource_path = '/render_tasks/looks/{look_id}/{result_format}'.replace('{format}', 'json') | |
path_params = {} | |
if 'look_id' in params: | |
path_params['look_id'] = params['look_id'] | |
if 'result_format' in params: | |
path_params['result_format'] = params['result_format'] | |
query_params = {} | |
if 'width' in params: | |
query_params['width'] = params['width'] | |
if 'height' in params: | |
query_params['height'] = params['height'] | |
if 'fields' in params: | |
query_params['fields'] = params['fields'] | |
header_params = {} | |
form_params = [] | |
local_var_files = {} | |
body_params = None | |
# HTTP header `Accept` | |
header_params['Accept'] = self.api_client.\ | |
select_header_accept(['application/json']) | |
if not header_params['Accept']: | |
del header_params['Accept'] | |
# HTTP header `Content-Type` | |
header_params['Content-Type'] = self.api_client.\ | |
select_header_content_type(['application/json']) | |
# Authentication setting | |
auth_settings = [] | |
response = self.api_client.call_api(resource_path, 'POST', | |
path_params, | |
query_params, | |
header_params, | |
body=body_params, | |
post_params=form_params, | |
files=local_var_files, | |
response_type='RenderTask', | |
auth_settings=auth_settings, | |
callback=params.get('callback')) | |
return response | |
def create_lookml_dashboard_render_task(self, dashboard_id, result_format, body, width, height, **kwargs): | |
""" | |
Create Lookml Dashboard Render Task | |
### Create a new task to render a lookml dashboard to a document or image.\n\nReturns a render task object.\nTo check the status of a render task, pass the render_task.id to [Get Render Task](#!/RenderTask/get_render_task).\nOnce the render task is complete, you can download the resulting document or image using [Get Render Task Results](#!/RenderTask/get_render_task_results).\n\n | |
This method makes a synchronous HTTP request by default. To make an | |
asynchronous HTTP request, please define a `callback` function | |
to be invoked when receiving the response. | |
>>> def callback_function(response): | |
>>> pprint(response) | |
>>> | |
>>> thread = api.create_lookml_dashboard_render_task(dashboard_id, result_format, body, width, height, callback=callback_function) | |
:param callback function: The callback function | |
for asynchronous request. (optional) | |
:param str dashboard_id: Id of lookml dashboard to render (required) | |
:param str result_format: Output type: pdf, png, or jpg (required) | |
:param CreateDashboardRenderTask body: Dashboard render task parameters (required) | |
:param int width: Output width in pixels (required) | |
:param int height: Output height in pixels (required) | |
:param str fields: Requested fields. | |
:param str pdf_paper_size: Paper size for pdf | |
:param bool pdf_landscape: Whether to render pdf in landscape | |
:return: RenderTask | |
If the method is called asynchronously, | |
returns the request thread. | |
""" | |
all_params = ['dashboard_id', 'result_format', 'body', 'width', 'height', 'fields', 'pdf_paper_size', 'pdf_landscape'] | |
all_params.append('callback') | |
params = locals() | |
for key, val in iteritems(params['kwargs']): | |
if key not in all_params: | |
raise TypeError( | |
"Got an unexpected keyword argument '%s'" | |
" to method create_lookml_dashboard_render_task" % key | |
) | |
params[key] = val | |
del params['kwargs'] | |
# verify the required parameter 'dashboard_id' is set | |
if ('dashboard_id' not in params) or (params['dashboard_id'] is None): | |
raise ValueError("Missing the required parameter `dashboard_id` when calling `create_lookml_dashboard_render_task`") | |
# verify the required parameter 'result_format' is set | |
if ('result_format' not in params) or (params['result_format'] is None): | |
raise ValueError("Missing the required parameter `result_format` when calling `create_lookml_dashboard_render_task`") | |
# verify the required parameter 'body' is set | |
if ('body' not in params) or (params['body'] is None): | |
raise ValueError("Missing the required parameter `body` when calling `create_lookml_dashboard_render_task`") | |
# verify the required parameter 'width' is set | |
if ('width' not in params) or (params['width'] is None): | |
raise ValueError("Missing the required parameter `width` when calling `create_lookml_dashboard_render_task`") | |
# verify the required parameter 'height' is set | |
if ('height' not in params) or (params['height'] is None): | |
raise ValueError("Missing the required parameter `height` when calling `create_lookml_dashboard_render_task`") | |
resource_path = '/render_tasks/lookml_dashboards/{dashboard_id}/{result_format}'.replace('{format}', 'json') | |
path_params = {} | |
if 'dashboard_id' in params: | |
path_params['dashboard_id'] = params['dashboard_id'] | |
if 'result_format' in params: | |
path_params['result_format'] = params['result_format'] | |
query_params = {} | |
if 'width' in params: | |
query_params['width'] = params['width'] | |
if 'height' in params: | |
query_params['height'] = params['height'] | |
if 'fields' in params: | |
query_params['fields'] = params['fields'] | |
if 'pdf_paper_size' in params: | |
query_params['pdf_paper_size'] = params['pdf_paper_size'] | |
if 'pdf_landscape' in params: | |
query_params['pdf_landscape'] = params['pdf_landscape'] | |
header_params = {} | |
form_params = [] | |
local_var_files = {} | |
body_params = None | |
if 'body' in params: | |
body_params = params['body'] | |
# HTTP header `Accept` | |
header_params['Accept'] = self.api_client.\ | |
select_header_accept(['application/json']) | |
if not header_params['Accept']: | |
del header_params['Accept'] | |
# HTTP header `Content-Type` | |
header_params['Content-Type'] = self.api_client.\ | |
select_header_content_type(['application/json']) | |
# Authentication setting | |
auth_settings = [] | |
response = self.api_client.call_api(resource_path, 'POST', | |
path_params, | |
query_params, | |
header_params, | |
body=body_params, | |
post_params=form_params, | |
files=local_var_files, | |
response_type='RenderTask', | |
auth_settings=auth_settings, | |
callback=params.get('callback')) | |
return response | |
def create_query_render_task(self, query_id, result_format, width, height, **kwargs): | |
""" | |
Create Query Render Task | |
### Create a new task to render an existing query to an image.\n\nReturns a render task object.\nTo check the status of a render task, pass the render_task.id to [Get Render Task](#!/RenderTask/get_render_task).\nOnce the render task is complete, you can download the resulting document or image using [Get Render Task Results](#!/RenderTask/get_render_task_results).\n\n | |
This method makes a synchronous HTTP request by default. To make an | |
asynchronous HTTP request, please define a `callback` function | |
to be invoked when receiving the response. | |
>>> def callback_function(response): | |
>>> pprint(response) | |
>>> | |
>>> thread = api.create_query_render_task(query_id, result_format, width, height, callback=callback_function) | |
:param callback function: The callback function | |
for asynchronous request. (optional) | |
:param int query_id: Id of the query to render (required) | |
:param str result_format: Output type: png or jpg (required) | |
:param int width: Output width in pixels (required) | |
:param int height: Output height in pixels (required) | |
:param str fields: Requested fields. | |
:return: RenderTask | |
If the method is called asynchronously, | |
returns the request thread. | |
""" | |
all_params = ['query_id', 'result_format', 'width', 'height', 'fields'] | |
all_params.append('callback') | |
params = locals() | |
for key, val in iteritems(params['kwargs']): | |
if key not in all_params: | |
raise TypeError( | |
"Got an unexpected keyword argument '%s'" | |
" to method create_query_render_task" % key | |
) | |
params[key] = val | |
del params['kwargs'] | |
# verify the required parameter 'query_id' is set | |
if ('query_id' not in params) or (params['query_id'] is None): | |
raise ValueError("Missing the required parameter `query_id` when calling `create_query_render_task`") | |
# verify the required parameter 'result_format' is set | |
if ('result_format' not in params) or (params['result_format'] is None): | |
raise ValueError("Missing the required parameter `result_format` when calling `create_query_render_task`") | |
# verify the required parameter 'width' is set | |
if ('width' not in params) or (params['width'] is None): | |
raise ValueError("Missing the required parameter `width` when calling `create_query_render_task`") | |
# verify the required parameter 'height' is set | |
if ('height' not in params) or (params['height'] is None): | |
raise ValueError("Missing the required parameter `height` when calling `create_query_render_task`") | |
resource_path = '/render_tasks/queries/{query_id}/{result_format}'.replace('{format}', 'json') | |
path_params = {} | |
if 'query_id' in params: | |
path_params['query_id'] = params['query_id'] | |
if 'result_format' in params: | |
path_params['result_format'] = params['result_format'] | |
query_params = {} | |
if 'width' in params: | |
query_params['width'] = params['width'] | |
if 'height' in params: | |
query_params['height'] = params['height'] | |
if 'fields' in params: | |
query_params['fields'] = params['fields'] | |
header_params = {} | |
form_params = [] | |
local_var_files = {} | |
body_params = None | |
# HTTP header `Accept` | |
header_params['Accept'] = self.api_client.\ | |
select_header_accept(['application/json']) | |
if not header_params['Accept']: | |
del header_params['Accept'] | |
# HTTP header `Content-Type` | |
header_params['Content-Type'] = self.api_client.\ | |
select_header_content_type(['application/json']) | |
# Authentication setting | |
auth_settings = [] | |
response = self.api_client.call_api(resource_path, 'POST', | |
path_params, | |
query_params, | |
header_params, | |
body=body_params, | |
post_params=form_params, | |
files=local_var_files, | |
response_type='RenderTask', | |
auth_settings=auth_settings, | |
callback=params.get('callback')) | |
return response | |
def render_task(self, render_task_id, **kwargs): | |
""" | |
Get Render Task | |
### Get information about a render task.\n\nReturns a render task object.\nTo check the status of a render task, pass the render_task.id to [Get Render Task](#!/RenderTask/get_render_task).\nOnce the render task is complete, you can download the resulting document or image using [Get Render Task Results](#!/RenderTask/get_render_task_results).\n\n | |
This method makes a synchronous HTTP request by default. To make an | |
asynchronous HTTP request, please define a `callback` function | |
to be invoked when receiving the response. | |
>>> def callback_function(response): | |
>>> pprint(response) | |
>>> | |
>>> thread = api.render_task(render_task_id, callback=callback_function) | |
:param callback function: The callback function | |
for asynchronous request. (optional) | |
:param str render_task_id: Id of render task (required) | |
:param str fields: Requested fields. | |
:return: RenderTask | |
If the method is called asynchronously, | |
returns the request thread. | |
""" | |
all_params = ['render_task_id', 'fields'] | |
all_params.append('callback') | |
params = locals() | |
for key, val in iteritems(params['kwargs']): | |
if key not in all_params: | |
raise TypeError( | |
"Got an unexpected keyword argument '%s'" | |
" to method render_task" % key | |
) | |
params[key] = val | |
del params['kwargs'] | |
# verify the required parameter 'render_task_id' is set | |
if ('render_task_id' not in params) or (params['render_task_id'] is None): | |
raise ValueError("Missing the required parameter `render_task_id` when calling `render_task`") | |
resource_path = '/render_tasks/{render_task_id}'.replace('{format}', 'json') | |
path_params = {} | |
if 'render_task_id' in params: | |
path_params['render_task_id'] = params['render_task_id'] | |
query_params = {} | |
if 'fields' in params: | |
query_params['fields'] = params['fields'] | |
header_params = {} | |
form_params = [] | |
local_var_files = {} | |
body_params = None | |
# HTTP header `Accept` | |
header_params['Accept'] = self.api_client.\ | |
select_header_accept(['application/json']) | |
if not header_params['Accept']: | |
del header_params['Accept'] | |
# HTTP header `Content-Type` | |
header_params['Content-Type'] = self.api_client.\ | |
select_header_content_type(['application/json']) | |
# Authentication setting | |
auth_settings = [] | |
response = self.api_client.call_api(resource_path, 'GET', | |
path_params, | |
query_params, | |
header_params, | |
body=body_params, | |
post_params=form_params, | |
files=local_var_files, | |
response_type='RenderTask', | |
auth_settings=auth_settings, | |
callback=params.get('callback')) | |
return response | |
def render_task_results(self, render_task_id, **kwargs): | |
""" | |
Render Task Results | |
### Get the document or image produced by a completed render task.\n\nReturns `102 Processing` if the render task has not completed.\n | |
This method makes a synchronous HTTP request by default. To make an | |
asynchronous HTTP request, please define a `callback` function | |
to be invoked when receiving the response. | |
>>> def callback_function(response): | |
>>> pprint(response) | |
>>> | |
>>> thread = api.render_task_results(render_task_id, callback=callback_function) | |
:param callback function: The callback function | |
for asynchronous request. (optional) | |
:param str render_task_id: Id of render task (required) | |
:return: str | |
If the method is called asynchronously, | |
returns the request thread. | |
""" | |
all_params = ['render_task_id'] | |
all_params.append('callback') | |
params = locals() | |
for key, val in iteritems(params['kwargs']): | |
if key not in all_params: | |
raise TypeError( | |
"Got an unexpected keyword argument '%s'" | |
" to method render_task_results" % key | |
) | |
params[key] = val | |
del params['kwargs'] | |
# verify the required parameter 'render_task_id' is set | |
if ('render_task_id' not in params) or (params['render_task_id'] is None): | |
raise ValueError("Missing the required parameter `render_task_id` when calling `render_task_results`") | |
resource_path = '/render_tasks/{render_task_id}/results'.replace('{format}', 'json') | |
path_params = {} | |
if 'render_task_id' in params: | |
path_params['render_task_id'] = params['render_task_id'] | |
query_params = {} | |
header_params = {} | |
form_params = [] | |
local_var_files = {} | |
body_params = None | |
# HTTP header `Accept` | |
header_params['Accept'] = self.api_client.\ | |
select_header_accept(['image/jpeg', 'image/png', 'application/pdf']) | |
if not header_params['Accept']: | |
del header_params['Accept'] | |
# HTTP header `Content-Type` | |
header_params['Content-Type'] = self.api_client.\ | |
select_header_content_type(['application/json']) | |
# Authentication setting | |
auth_settings = [] | |
response = self.api_client.call_api(resource_path, 'GET', | |
path_params, | |
query_params, | |
header_params, | |
body=body_params, | |
post_params=form_params, | |
files=local_var_files, | |
response_type='str', | |
auth_settings=auth_settings, | |
decode=False, | |
callback=params.get('callback')) | |
return response |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
""" | |
Copyright 2016 SmartBear Software | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | |
You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
Credit: this file (rest.py) is modified based on rest.py in Dropbox Python SDK: | |
https://www.dropbox.com/developers/core/sdks/python | |
""" | |
from __future__ import absolute_import | |
import sys | |
import io | |
import json | |
import ssl | |
import certifi | |
import logging | |
from .configuration import Configuration | |
import urllib3 | |
from urllib.parse import urlencode | |
logger = logging.getLogger(__name__) | |
class RESTResponse(io.IOBase): | |
def __init__(self, resp): | |
self.urllib3_response = resp | |
self.status = resp.status | |
self.reason = resp.reason | |
self.data = resp.data | |
def getheaders(self): | |
""" | |
Returns a dictionary of the response headers. | |
""" | |
return self.urllib3_response.getheaders() | |
def getheader(self, name, default=None): | |
""" | |
Returns a given response header. | |
""" | |
return self.urllib3_response.getheader(name, default) | |
class RESTClientObject(object): | |
def __init__(self, pools_size=4, maxsize=20): | |
# urllib3.PoolManager will pass all kw parameters to connectionpool | |
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 | |
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 | |
# ca_certs vs cert_file vs key_file | |
# http://stackoverflow.com/a/23957365/2985775 | |
# cert_reqs | |
if Configuration().verify_ssl: | |
cert_reqs = ssl.CERT_REQUIRED | |
else: | |
cert_reqs = ssl.CERT_NONE | |
# ca_certs | |
if Configuration().ssl_ca_cert: | |
ca_certs = Configuration().ssl_ca_cert | |
else: | |
# if not set certificate file, use Mozilla's root certificates. | |
ca_certs = certifi.where() | |
# cert_file | |
cert_file = Configuration().cert_file | |
# https pool manager | |
self.pool_manager = urllib3.PoolManager( | |
num_pools=pools_size, | |
maxsize=maxsize, | |
cert_reqs=cert_reqs, | |
ca_certs=ca_certs, | |
cert_file=cert_file, | |
) | |
def request(self, method, url, query_params=None, headers=None, | |
body=None, post_params=None, decode=True): | |
""" | |
:param method: http request method | |
:param url: http request url | |
:param query_params: query parameters in the url | |
:param headers: http request headers | |
:param body: request json body, for `application/json` | |
:param post_params: request post parameters, | |
`application/x-www-form-urlencode` | |
and `multipart/form-data` | |
""" | |
method = method.upper() | |
assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT', 'PATCH', 'OPTIONS'] | |
if post_params and body: | |
raise ValueError( | |
"body parameter cannot be used with post_params parameter." | |
) | |
post_params = post_params or {} | |
headers = headers or {} | |
if 'Content-Type' not in headers: | |
headers['Content-Type'] = 'application/json' | |
try: | |
# For `POST`, `PUT`, `PATCH`, `OPTIONS` | |
if method in ['POST', 'PUT', 'PATCH', 'OPTIONS']: | |
if query_params: | |
url += '?' + urlencode(query_params) | |
if headers['Content-Type'] == 'application/json': | |
r = self.pool_manager.request(method, url, | |
body=json.dumps(body), | |
headers=headers) | |
if headers['Content-Type'] == 'application/x-www-form-urlencoded': | |
r = self.pool_manager.request(method, url, | |
fields=post_params, | |
encode_multipart=False, | |
headers=headers) | |
if headers['Content-Type'] == 'multipart/form-data': | |
# must del headers['Content-Type'], or the correct Content-Type | |
# which generated by urllib3 will be overwritten. | |
del headers['Content-Type'] | |
r = self.pool_manager.request(method, url, | |
fields=post_params, | |
encode_multipart=True, | |
headers=headers) | |
# For `GET`, `HEAD`, `DELETE` | |
else: | |
r = self.pool_manager.request(method, url, | |
fields=query_params, | |
headers=headers) | |
except urllib3.exceptions.SSLError as e: | |
msg = "{0}\n{1}".format(type(e).__name__, str(e)) | |
raise ApiException(status=0, reason=msg) | |
r = RESTResponse(r) | |
# In the python 3, the response.data is bytes. | |
# we need to decode it to string. | |
if sys.version_info > (3,) and decode: | |
r.data = r.data.decode('utf8') | |
# log response body | |
logger.debug("response body: %s" % r.data) | |
if r.status not in range(200, 206): | |
raise ApiException(http_resp=r) | |
return r | |
def GET(self, url, headers=None, query_params=None, decode=True): | |
return self.request("GET", url, | |
headers=headers, | |
query_params=query_params, | |
decode=decode) | |
def HEAD(self, url, headers=None, query_params=None, decode=True): | |
return self.request("HEAD", url, | |
headers=headers, | |
query_params=query_params, | |
decode=decode) | |
def OPTIONS(self, url, headers=None, query_params=None, post_params=None, | |
body=None, decode=True): | |
return self.request("OPTIONS", url, | |
headers=headers, | |
query_params=query_params, | |
post_params=post_params, | |
body=body, | |
decode=decode) | |
def DELETE(self, url, headers=None, query_params=None, decode=True): | |
return self.request("DELETE", url, | |
headers=headers, | |
query_params=query_params, | |
decode=decode) | |
def POST(self, url, headers=None, query_params=None, post_params=None, | |
body=None, decode=True): | |
return self.request("POST", url, | |
headers=headers, | |
query_params=query_params, | |
post_params=post_params, | |
body=body, | |
decode=decode) | |
def PUT(self, url, headers=None, query_params=None, post_params=None, | |
body=None, decode=True): | |
return self.request("PUT", url, | |
headers=headers, | |
query_params=query_params, | |
post_params=post_params, | |
body=body, | |
decode=decode) | |
def PATCH(self, url, headers=None, query_params=None, post_params=None, | |
body=None, decode=True): | |
return self.request("PATCH", url, | |
headers=headers, | |
query_params=query_params, | |
post_params=post_params, | |
body=body, | |
decode=decode) | |
class ApiException(Exception): | |
def __init__(self, status=None, reason=None, http_resp=None): | |
if http_resp: | |
self.status = http_resp.status | |
self.reason = http_resp.reason | |
self.body = http_resp.data | |
self.headers = http_resp.getheaders() | |
else: | |
self.status = status | |
self.reason = reason | |
self.body = None | |
self.headers = None | |
def __str__(self): | |
""" | |
Custom error messages for exception | |
""" | |
error_message = "({0})\n"\ | |
"Reason: {1}\n".format(self.status, self.reason) | |
if self.headers: | |
error_message += "HTTP response headers: {0}\n".format(self.headers) | |
if self.body: | |
error_message += "HTTP response body: {0}\n".format(self.body) | |
return error_message |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment