Created
May 6, 2020 22:56
-
-
Save divinorum-webb/a300f41bca41677113e4a49012365e95 to your computer and use it in GitHub Desktop.
Template code for the third milestone in my Medium blog series on building impact analysis reports for Tableau.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sqlalchemy import create_engine | |
import pandas as pd | |
from pandas.io.json import json_normalize | |
from tableau_api_lib import TableauServerConnection | |
from tableau_api_lib.utils import flatten_dict_column, flatten_dict_list_column | |
from tableau_api_lib.utils.querying import get_projects_dataframe | |
from tableauhyperapi import HyperProcess, Connection, TableDefinition, SqlType, Telemetry, Inserter, CreateMode, TableName | |
from tableauhyperapi import escape_string_literal | |
# using personal access tokens is preferred; otherwise, comment those details out and use username / password | |
tableau_server_config = { | |
'my_env': { | |
'server': 'https://mytableauserver.com', # replace with your own server | |
'api_version': '3.8', # replace with your REST API version | |
'personal_access_token_name': '<PAT NAME>', | |
'personal_access_token_secret': '<PAT SECRET', | |
# 'username': '<USERNAME>', | |
# 'password': '<PASSWORD>', | |
'site_name': '<your-pretty-site-name>', # if accessing your default site, set this to '' | |
'site_url': '<YourSiteContentUrl>' # if accessing your default site, set this to '' | |
} | |
} | |
pg_config = { | |
'host': '<your_host>', | |
'port': '8060', | |
'database': 'workgroup', | |
'username': 'readonly', | |
'password': '<your_password>' | |
} | |
pg_query = """ | |
WITH events AS ( | |
SELECT DISTINCT MAX(DATE_TRUNC('day', created_at)) AS last_interaction_date | |
, hist_target_site_id | |
, hist_view_id | |
, historical_event_type_id | |
FROM historical_events | |
GROUP BY 2, 3, 4 | |
), | |
event_types AS ( | |
SELECT DISTINCT type_id | |
, name | |
, action_type | |
FROM historical_event_types | |
), | |
temp_sites AS ( | |
SELECT DISTINCT name | |
, id | |
, site_id AS hist_site_id | |
FROM hist_sites | |
), | |
temp_views AS ( | |
SELECT DISTINCT name | |
, id | |
, view_id AS hist_view_id | |
FROM hist_views | |
), | |
sites_core AS ( | |
SELECT DISTINCT name AS site_name | |
, id | |
, luid AS site_id | |
FROM sites | |
), | |
views_core AS ( | |
SELECT DISTINCT name AS view_name | |
, id | |
, luid AS view_id | |
FROM views | |
), | |
view_stats AS ( | |
SELECT views.id | |
, views.name AS view_name | |
, views.workbook_id | |
, views.site_id | |
, views.luid AS view_id | |
, sites.name AS site_name | |
, workbooks.name AS workbook_name | |
, SUM(views_stats.nviews) AS num_views | |
FROM views | |
LEFT JOIN views_stats ON views.id = views_stats.view_id | |
LEFT JOIN sites ON views.site_id = sites.id | |
LEFT JOIN workbooks ON views.workbook_id = workbooks.id | |
GROUP BY 1, 2, 3, 4, 5, 6, 7 | |
ORDER BY num_views, view_id DESC | |
) | |
SELECT MAX(last_interaction_date) AS last_interaction_date | |
, sites_core.site_name | |
, sites_core.site_id | |
, views_core.view_name | |
, views_core.view_id | |
, view_stats.workbook_name | |
, MAX(num_views) AS num_views | |
FROM events | |
LEFT JOIN event_types ON events.historical_event_type_id = event_types.type_id | |
LEFT JOIN temp_sites ON events.hist_target_site_id = temp_sites.id | |
LEFT JOIN sites_core ON temp_sites.hist_site_id = sites_core.id | |
LEFT JOIN temp_views ON events.hist_view_id = temp_views.id | |
LEFT JOIN views_core ON temp_views.hist_view_id = views_core.id | |
LEFT JOIN view_stats ON views_core.view_id = view_stats.view_id | |
WHERE views_core.view_id IS NOT NULL | |
AND action_type IN ('Send E-Mail', 'Access', 'Create', 'Publish') | |
GROUP BY 2, 3, 4, 5, 6; | |
""" | |
query_workbooks = """ | |
{ | |
workbooks { | |
workbook_name: name | |
workbook_id: luid | |
workbook_project: projectName | |
views { | |
view_type: __typename | |
view_name: name | |
view_id: luid | |
} | |
upstreamTables { | |
upstr_table_name: name | |
upstr_table_id: luid | |
upstreamDatabases { | |
upstr_db_name: name | |
upstr_db_type: connectionType | |
upstr_db_id: luid | |
upstr_db_isEmbedded: isEmbedded | |
} | |
} | |
upstreamDatasources { | |
upstr_ds_name: name | |
upstr_ds_id: luid | |
upstr_ds_project: projectName | |
} | |
embeddedDatasources { | |
emb_ds_name: name | |
} | |
upstreamFlows { | |
flow_name: name | |
flow_id: luid | |
flow_project: projectName | |
} | |
} | |
} | |
""" | |
query_databases = """ | |
{ | |
databaseServers { | |
database_hostname: hostName | |
database_port: port | |
database_id: luid | |
} | |
} | |
""" | |
# get the desired data from the PostgreSQL database | |
pg_connection_string = "postgresql://{0}:{1}@{2}:{3}/{4}".format(pg_config['username'], | |
pg_config['password'], | |
pg_config['host'], | |
pg_config['port'], | |
pg_config['database']) | |
str_cols = ['site_name', 'site_id', 'view_name', 'view_id', 'workbook_name'] | |
core_cols = ['view_id', 'last_interaction_date', 'num_views'] | |
sql_engine = create_engine(pg_connection_string) | |
results_df = pd.read_sql_query(pg_query, con=sql_engine) | |
results_df[str_cols] = results_df[str_cols].astype(str) | |
pg_views_df = results_df[core_cols] | |
print(pg_views_df.head()) | |
# establish a connection with Tableau Server and extract the desired metadata from the Metadata API | |
conn = TableauServerConnection(tableau_server_config, 'my_env') | |
conn.sign_in() | |
wb_query_results = conn.metadata_graphql_query(query_workbooks) | |
db_query_results = conn.metadata_graphql_query(query_databases) | |
db_query_results_json = db_query_results.json()['data']['databaseServers'] | |
wb_query_results_json = wb_query_results.json()['data']['workbooks'] | |
wb_df = json_normalize(wb_query_results.json()['data']['workbooks']) | |
wb_df.drop(columns=['views', 'upstreamTables', 'upstreamDatasources', 'embeddedDatasources', 'upstreamFlows'], inplace=True) | |
wb_views_df = json_normalize(data=wb_query_results_json, record_path='views', meta='workbook_id') | |
wb_tables_df = json_normalize(data=wb_query_results_json, record_path='upstreamTables', meta='workbook_id') | |
wb_tables_dbs_df = flatten_dict_list_column(df=wb_tables_df, col_name='upstreamDatabases') | |
db_df = pd.DataFrame(db_query_results_json) | |
wb_uds_df = json_normalize(data=wb_query_results_json, record_path='upstreamDatasources', meta='workbook_id') | |
wb_eds_df = json_normalize(data=wb_query_results_json, record_path='embeddedDatasources', meta='workbook_id') | |
wb_flows_df = json_normalize(data=wb_query_results_json, record_path='upstreamFlows', meta='workbook_id') | |
combined_df = wb_df.merge(wb_views_df, how='left', on='workbook_id') | |
combined_df = combined_df.merge(wb_tables_dbs_df, how='left', on='workbook_id') | |
combined_df = combined_df.merge(db_df, how='left', left_on='upstr_db_id', right_on='database_id') | |
combined_df = combined_df.merge(wb_uds_df, how='left', on='workbook_id') | |
combined_df = combined_df.merge(wb_eds_df, how='left', on='workbook_id') | |
combined_df['summary_date'] = pd.datetime.now() | |
combined_df['site_name'] = conn.site_name | |
# combine the Metadata from milestone 1 with our PostgreSQL data from milestone 2 | |
combined_df = combined_df.merge(pg_views_df, how='left', on=['view_id']) | |
# END MILESTONES 1 & 2 | |
# BEGIN MILESTONE 3 | |
# take a look at the data types in your DataFrame, so you have an idea of how to feed data into the Hyper API | |
print(combined_df.dtypes) | |
# a bit of pre-processing on our DataFrame so the Hyper API doesn't throw errors due to NaN values | |
non_date_columns = [col for col in combined_df.columns if col not in ['last_interaction_date', 'summary_date']] | |
combined_df[non_date_columns] = combined_df[non_date_columns].fillna('') | |
combined_df['database_port'] = combined_df['database_port'].astype('str') | |
combined_df['num_views'] = combined_df['num_views'].apply(lambda x: str(x).replace('', '0')) | |
combined_df['num_views'] = combined_df['num_views'].astype('float64') | |
combined_df['last_interaction_date'] = combined_df['last_interaction_date'].apply(lambda x: str(x.date())) | |
combined_df['summary_date'] = combined_df['summary_date'].apply(lambda x: str(x.date())) | |
# create the .hyper extract file | |
PATH_TO_HYPER = 'workbooks_and_owners.hyper' | |
# Step 1: Start a new private local Hyper instance | |
with HyperProcess(Telemetry.SEND_USAGE_DATA_TO_TABLEAU, 'myapp' ) as hyper: | |
# Step 2: Create the the .hyper file, replace it if it already exists | |
with Connection(endpoint=hyper.endpoint, | |
create_mode=CreateMode.CREATE_AND_REPLACE, | |
database=PATH_TO_HYPER) as connection: | |
# Step 3: Create the schema | |
connection.catalog.create_schema('Extract') | |
# Step 4: Create the table definition | |
schema = TableDefinition(table_name=TableName('Extract','Extract'), | |
columns=[ | |
TableDefinition.Column('workbook_name', SqlType.text()), | |
TableDefinition.Column('workbook_id', SqlType.text()), | |
TableDefinition.Column('workbook_project', SqlType.text()), | |
TableDefinition.Column('view_type', SqlType.text()), | |
TableDefinition.Column('view_name', SqlType.text()), | |
TableDefinition.Column('view_id', SqlType.text()), | |
TableDefinition.Column('upstr_table_name', SqlType.text()), | |
TableDefinition.Column('upstr_table_id', SqlType.text()), | |
TableDefinition.Column('upstr_db_name', SqlType.text()), | |
TableDefinition.Column('upstr_db_type', SqlType.text()), | |
TableDefinition.Column('upstr_db_id', SqlType.text()), | |
TableDefinition.Column('upstr_db_isEmbedded', SqlType.bool()), | |
TableDefinition.Column('database_hostname', SqlType.text()), | |
TableDefinition.Column('database_port', SqlType.text()), | |
TableDefinition.Column('database_id', SqlType.text()), | |
TableDefinition.Column('upstr_ds_name', SqlType.text()), | |
TableDefinition.Column('upstr_ds_id', SqlType.text()), | |
TableDefinition.Column('upstr_ds_project', SqlType.text()), | |
TableDefinition.Column('emb_ds_name', SqlType.text()), | |
TableDefinition.Column('summary_date', SqlType.text()), | |
TableDefinition.Column('site_name', SqlType.text()), | |
TableDefinition.Column('last_interaction_date', SqlType.text()), | |
TableDefinition.Column('num_views', SqlType.double()) | |
]) | |
# Step 5: Create the table in the connection catalog | |
connection.catalog.create_table(schema) | |
with Inserter(connection, schema) as inserter: | |
for index, row in combined_df.iterrows(): | |
inserter.add_row(row) | |
inserter.execute() | |
print("The connection to the Hyper file is closed.") | |
# publish the .hyper extract to Tableau Server | |
# get information about your projects; you need a project ID to publish to | |
projects_df = get_projects_dataframe(conn) | |
print(projects_df[['name', 'id']]) | |
response = conn.publish_data_source(datasource_file_path=PATH_TO_HYPER, | |
datasource_name='Impact Analysis Milestone3', | |
project_id='<YOUR_PROJECT_ID>') | |
# verify that the response indicates the datasource was published successfully | |
print(response.json()) | |
conn.sign_out() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment