Skip to content

Instantly share code, notes, and snippets.

@artpel
Last active February 10, 2022 11:35
Show Gist options
  • Save artpel/abe4ef48b76abed1aead48aa28d15b96 to your computer and use it in GitHub Desktop.
Save artpel/abe4ef48b76abed1aead48aa28d15b96 to your computer and use it in GitHub Desktop.
Looker API Python Client
import json
import os
import looker_sdk
class LookerClient:
# This is the Looker client to communicate to Looker API with their SDK
# and fetch our content metadata
# We don't fetch all the data from the response as they have a **lot** of useless fields and
# it's heavily memory-consuming
def __init__(self):
os.environ["LOOKERSDK_BASE_URL"] =
os.environ["LOOKERSDK_CLIENT_ID"] =
os.environ["LOOKERSDK_CLIENT_SECRET"] =
self.sdk = looker_sdk.init31()
def search_user_by_email(self, email):
print(f"Finding Looker users with their email: {email}")
users = self.sdk.search_users(email)
return users
def search_user_by_first_name(self, first_name: str):
print(f"Finding Looker users with their first_name: {first_name}")
users = self.sdk.search_users(first_name=first_name)
return users
def delete_user(self, user_id: str):
self.sdk.delete_user(user_id)
return
def get_user(self, user_id):
print(f"Getting Looker users with their ID: {user_id}")
user = self.sdk.user(user_id)
return user
def update_user(self, user_id, body):
print(f"Getting Looker users with this parameters: {body}")
updated_user = self.sdk.update_user(user_id, body)
return updated_user
def get_model(self, model):
print(f"Getting model {model}")
model = self.sdk.lookml_model(model)
return model
def get_explore_details(self, explore_name, model="swile-snowflake"):
print(f"Getting explore {explore_name} in model {model}")
explore = self.sdk.lookml_model_explore(model, explore_name)
return {
"id": explore["id"],
"name": explore["name"],
"description": explore["description"],
"source_file": explore["source_file"],
"model_name": explore["model_name"],
"base_view_name": explore["view_name"],
"sql_table_name": explore["sql_table_name"],
"scopes": explore["scopes"],
}
def get_fields_metadata_from_explore(self, explore_name, model="swile-snowflake"):
print(f"Getting fields from explore {explore_name} in model {model}")
explore = self.sdk.lookml_model_explore(model, explore_name)
dimensions = explore["fields"]["dimensions"]
measures = explore["fields"]["measures"]
lookml_fields = dimensions + measures
cleaned_lookml_fields = []
for field in lookml_fields:
cleaned_lookml_fields.append(
{
"explore": explore_name,
"view": field["view"],
"name": field["name"],
"label": field["label"],
"description": field["description"],
"hidden": field["hidden"],
"type": field["type"],
"primary_key": field["primary_key"],
"sql": field["sql"],
"lookml_field_type": self._determine_lookml_field_type(field["dimension_group"], field["measure"]),
"lookml_link": field["lookml_link"],
}
)
return cleaned_lookml_fields
def get_all_dashboard_ids(self):
print("Getting dashboard IDs for all dashboards")
all_dashboards = self.sdk.all_dashboards()
dashboard_ids = []
for dashboard in all_dashboards:
if not dashboard["model"]: # to remove custom Dashboards not on swile-snowflake model
dashboard_ids.append(dashboard["id"])
return dashboard_ids
def get_dashboard_metadata(self, dashboard_id):
dashboard_metadatas = self.sdk.dashboard(dashboard_id)
return {
"id": dashboard_metadatas["id"],
"title": dashboard_metadatas["title"],
"user_id": dashboard_metadatas["user_id"],
"folder": self._return_folder_data(dashboard_metadatas["folder"]),
"last_viewed_at": dashboard_metadatas["last_viewed_at"],
"deleted": dashboard_metadatas["deleted"],
}
def get_look(self, look_id):
print(f"Getting metadata from Look {look_id}")
look_metadatas = self.sdk.look(look_id)
return {
"id": look_metadatas["id"],
"title": look_metadatas["title"],
"created_at": look_metadatas["created_at"],
"deleted_at": look_metadatas["deleted_at"],
"last_accessed_at": look_metadatas["last_accessed_at"],
"query_id": look_metadatas["query_id"],
"user_id": look_metadatas["user_id"],
"folder": self._return_folder_data(look_metadatas["folder"]),
}
def get_query(self, query_id):
print(f"Getting metadata from Looker query {query_id}")
query_metadata = self.sdk.query(query_id)
return {
"id": query_metadata["id"],
"view": query_metadata["view"],
"fields": query_metadata["fields"],
"pivots": query_metadata["pivots"],
"filters": dict(query_metadata["filters"]) if query_metadata["filters"] else None,
"sorts": query_metadata["sorts"],
"limit": query_metadata["limit"],
"column_limit": query_metadata["column_limit"],
"share_url": query_metadata["share_url"],
}
def get_dashboard_visualization_elements(self, dashboard_id):
print(f"Getting dashboard visualization elements from Looker dashboard {dashboard_id}")
dashboard_metadatas = self.sdk.dashboard(dashboard_id)
dashboard_elements = dashboard_metadatas["dashboard_elements"]
dashboard_visualization_elements = []
for element in dashboard_elements:
if element["type"] == "vis":
dashboard_visualization_elements.append(
{
"id": element["id"],
"title": element["title"],
"note_text": element["note_text"],
"dashboard_id": element["dashboard_id"],
"query_id": element["query_id"],
"look_id": element["look_id"],
}
)
return dashboard_visualization_elements
def get_dashboard_filter_elements(self, dashboard_id):
print(f"Getting dashboard filter elements from Looker dashboard {dashboard_id}")
dashboard_metadatas = self.sdk.dashboard(dashboard_id)
dashboard_filters = dashboard_metadatas["dashboard_filters"]
dashboard_filter_elements = []
for element in dashboard_filters:
dashboard_filter_elements.append(
{
"id": element["id"],
"dashboard_id": element["dashboard_id"],
"name": element["name"],
"title": element["title"],
"type": element["type"],
"explore": element["explore"],
"dimension": element["dimension"],
"allow_multiple_values": element["allow_multiple_values"],
"required": element["required"],
"default_value": element["default_value"],
}
)
return dashboard_filter_elements
def get_all_looks(self):
print(f"Getting all looks")
cleaned_looks = []
all_looks = self.sdk.all_looks()
for look in all_looks:
cleaned_looks.append(
{
"id": look["id"],
"title": look["title"],
"created_at": look["created_at"],
"deleted_at": look["deleted_at"],
"last_accessed_at": look["last_accessed_at"],
"query_id": look["query_id"],
"user_id": look["user_id"],
"folder": self._return_folder_data(look["folder"]),
}
)
return cleaned_looks
def create_query(self, explore: str, fields: list, model="swile-snowflake", limit=0):
print(f"Creating query for fields {fields} in explore {explore}")
query_builder = {"model": model, "view": explore, "fields": fields, "limit": limit}
query_metadata = self.sdk.create_query(query_builder)
return query_metadata
def run_query(self, query_id, result_type="json"):
print(f"Running query {query_id}")
query_result = self.sdk.run_query(query_id, result_type)
return json.loads(query_result)
def get_content_errors_from_content_validator(self):
clean_errors = []
content_in_error = self.sdk.content_validation()
for content in content_in_error["content_with_errors"]:
errors = content["errors"]
for error in errors:
look_in_error = content["look"]
dashboard_element_in_error = content["dashboard_element"]
dashboard_filter_in_error = content["dashboard_filter"]
clean_errors.append(
{
"id": content["id"],
"look_id": look_in_error["id"] if look_in_error else None,
"dashboard_id": dashboard_element_in_error["dashboard_id"]
if dashboard_element_in_error
else dashboard_filter_in_error["dashboard_id"]
if dashboard_filter_in_error
else None,
"query_id": dashboard_element_in_error["query_id"] if dashboard_element_in_error else None,
"message": error["message"],
"field_name": error["field_name"],
"model_name": errors[0][
"model_name"
], # Weird, but Looker API only set model_name and explore_name for first error
"explore_name": errors[0]["explore_name"], # Same as above
"error_location": self._return_content_error_location(
look_in_error, dashboard_element_in_error, dashboard_filter_in_error
),
}
)
return clean_errors
def _return_folder_data(self, folder):
if folder:
return {"folder_id": folder["id"], "in_personal_folder": folder["is_personal"]}
else:
return {}
def _return_content_error_location(self, look, dashboard_element, dashboard_filter):
if look:
return "look"
elif dashboard_element:
return "dashboard_element"
elif dashboard_filter:
return "dashboard_filter"
else:
None
def _determine_lookml_field_type(self, dimension_group, measure):
if dimension_group:
return "dimension_group"
elif measure:
return "measure"
else:
return "dimension"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment