Created
January 11, 2021 13:32
-
-
Save giuseppellrusso/03daabba54424ceac83309921121320c to your computer and use it in GitHub Desktop.
Python module developed to programmatically run Domo datasets and dataflows
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pydomo import Domo | |
import logging | |
import json | |
import requests | |
""" Connect to Domo using the Pydomo library and the client id and secret | |
Documentation: | |
https://developer.domo.com/docs/authentication/quickstart-5 | |
Args: | |
client_id (str): client_id to connect with | |
client_secret (str): client secret to connect with | |
api_host (str): host of the Domo api | |
domo_log_level (logging): level of Domo logger. Default set to DEBUG | |
Returns: | |
an istance of the Domo object from Pydomo library | |
""" | |
def get_domo_connection(client_id, client_secret, api_host, domo_log_level = logging.DEBUG) : | |
try : | |
domo = Domo(client_id, client_secret, log_level=domo_log_level, api_host=api_host) | |
return domo | |
except Exception as e : | |
logging.error(e) | |
raise Exception(e) | |
""" Gets the stream associated with a Domo dataset | |
Args: | |
streams (pydomo object): pydomo object containing the list of streams of a domo instance | |
dataset_id (str): alphanumeric id of a domo dataset | |
stream_list_limit (int): limit of streams to retrieve from the pydomo streams object | |
Returns: | |
the pydomo stream object of the stream associated with the given dataset | |
""" | |
def get_stream_by_dataset_id(streams, dataset_id, stream_list_limit = 2000) : | |
stream_list = streams.list(stream_list_limit,0) | |
dataset_streams = [stream for stream in stream_list if stream['dataSet']['id'] == dataset_id] | |
if(len(dataset_streams)==1): | |
return dataset_streams[0] | |
else: | |
no_stream_found_string = 'No stream found for dataset {}'.format(dataset_id) | |
logging.error(no_stream_found_string) | |
raise Exception(no_stream_found_string) | |
""" Runs a Domo dataset | |
The method is actually creating a new execution for the stream associated to the dataset | |
Args: | |
domo_connection (pydomo object): instance of the pydomo Domo object | |
dataset_id (str): alphanumeric id of a domo dataset | |
""" | |
def run_dataset(domo_connection, dataset_id): | |
try : | |
streams = domo_connection.streams | |
dataset_stream = get_stream_by_dataset_id(streams, dataset_id) | |
dataset_stream_id = dataset_stream['id'] | |
streams.create_execution(dataset_stream_id) | |
logging.info('Successfully run dataset id {}'.format(dataset_id)) | |
except Exception as e: | |
logging.error(e) | |
raise Exception(e) | |
""" Gets a session token to be used for undocumented private api | |
Args: | |
domo_instance (str): name of the domo instance | |
email (str): email used to login to the domo instance | |
password (str): password used to login to the domo instance | |
Returns: | |
the session token | |
""" | |
def get_session_token(domo_instance, email, password): | |
auth_api = 'https://{}.domo.com/api/content/v2/authentication'.format(domo_instance) | |
auth_body = json.dumps({ | |
"method": "password", | |
"emailAddress": email, | |
"password": password | |
}) | |
auth_headers = {'Content-Type' : 'application/json'} | |
auth_response = requests.post(auth_api, data = auth_body, headers = auth_headers) | |
auth_status = auth_response.status_code | |
if auth_status == 200 : | |
logging.info('Session token acquired.') | |
return auth_response.json()['sessionToken'] | |
else : | |
token_error_string = 'Token request ended up with status code {}'.format(auth_status) | |
logging.error(token_error_string) | |
logging.error(auth_response.text) | |
raise Exception(token_error_string) | |
""" Runs a Domo dataflow | |
Args: | |
domo_instance (str): name of the domo instance | |
domo_token (str): session token used to authenticate to the private apis | |
dataflow_id (str): id of the dataflow to be run | |
Returns: | |
the status of the post request used for running the dataflow | |
""" | |
def run_dataflow(domo_instance, domo_token, dataflow_id): | |
dataflow_api = 'https://{}.domo.com/api/dataprocessing/v1/dataflows/{}/executions'.format(domo_instance, dataflow_id) | |
dataflow_headers = {'Content-Type' : 'application/json', | |
'x-domo-authentication' : domo_token} | |
dataflow_response = requests.post(url = dataflow_api, headers = dataflow_headers) | |
dataflow_status = dataflow_response.status_code | |
if dataflow_status == 200 : | |
logging.info('Successfully run dataflow id {}'.format(dataflow_id)) | |
return dataflow_status | |
else : | |
dataflow_error_string = 'Dataflow run request ended up with status code {}'.format(dataflow_status) | |
logging.error(dataflow_error_string) | |
logging.error(dataflow_response.text) | |
raise Exception(dataflow_error_string) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment