Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save divinorum-webb/a300f41bca41677113e4a49012365e95 to your computer and use it in GitHub Desktop.
Save divinorum-webb/a300f41bca41677113e4a49012365e95 to your computer and use it in GitHub Desktop.
Template code for the third milestone in my Medium blog series on building impact analysis reports for Tableau.
from sqlalchemy import create_engine
import pandas as pd
from pandas.io.json import json_normalize
from tableau_api_lib import TableauServerConnection
from tableau_api_lib.utils import flatten_dict_column, flatten_dict_list_column
from tableau_api_lib.utils.querying import get_projects_dataframe
from tableauhyperapi import HyperProcess, Connection, TableDefinition, SqlType, Telemetry, Inserter, CreateMode, TableName
from tableauhyperapi import escape_string_literal
# using personal access tokens is preferred; otherwise, comment those details out and use username / password
tableau_server_config = {
'my_env': {
'server': 'https://mytableauserver.com', # replace with your own server
'api_version': '3.8', # replace with your REST API version
'personal_access_token_name': '<PAT NAME>',
'personal_access_token_secret': '<PAT SECRET',
# 'username': '<USERNAME>',
# 'password': '<PASSWORD>',
'site_name': '<your-pretty-site-name>', # if accessing your default site, set this to ''
'site_url': '<YourSiteContentUrl>' # if accessing your default site, set this to ''
}
}
pg_config = {
'host': '<your_host>',
'port': '8060',
'database': 'workgroup',
'username': 'readonly',
'password': '<your_password>'
}
pg_query = """
WITH events AS (
SELECT DISTINCT MAX(DATE_TRUNC('day', created_at)) AS last_interaction_date
, hist_target_site_id
, hist_view_id
, historical_event_type_id
FROM historical_events
GROUP BY 2, 3, 4
),
event_types AS (
SELECT DISTINCT type_id
, name
, action_type
FROM historical_event_types
),
temp_sites AS (
SELECT DISTINCT name
, id
, site_id AS hist_site_id
FROM hist_sites
),
temp_views AS (
SELECT DISTINCT name
, id
, view_id AS hist_view_id
FROM hist_views
),
sites_core AS (
SELECT DISTINCT name AS site_name
, id
, luid AS site_id
FROM sites
),
views_core AS (
SELECT DISTINCT name AS view_name
, id
, luid AS view_id
FROM views
),
view_stats AS (
SELECT views.id
, views.name AS view_name
, views.workbook_id
, views.site_id
, views.luid AS view_id
, sites.name AS site_name
, workbooks.name AS workbook_name
, SUM(views_stats.nviews) AS num_views
FROM views
LEFT JOIN views_stats ON views.id = views_stats.view_id
LEFT JOIN sites ON views.site_id = sites.id
LEFT JOIN workbooks ON views.workbook_id = workbooks.id
GROUP BY 1, 2, 3, 4, 5, 6, 7
ORDER BY num_views, view_id DESC
)
SELECT MAX(last_interaction_date) AS last_interaction_date
, sites_core.site_name
, sites_core.site_id
, views_core.view_name
, views_core.view_id
, view_stats.workbook_name
, MAX(num_views) AS num_views
FROM events
LEFT JOIN event_types ON events.historical_event_type_id = event_types.type_id
LEFT JOIN temp_sites ON events.hist_target_site_id = temp_sites.id
LEFT JOIN sites_core ON temp_sites.hist_site_id = sites_core.id
LEFT JOIN temp_views ON events.hist_view_id = temp_views.id
LEFT JOIN views_core ON temp_views.hist_view_id = views_core.id
LEFT JOIN view_stats ON views_core.view_id = view_stats.view_id
WHERE views_core.view_id IS NOT NULL
AND action_type IN ('Send E-Mail', 'Access', 'Create', 'Publish')
GROUP BY 2, 3, 4, 5, 6;
"""
query_workbooks = """
{
workbooks {
workbook_name: name
workbook_id: luid
workbook_project: projectName
views {
view_type: __typename
view_name: name
view_id: luid
}
upstreamTables {
upstr_table_name: name
upstr_table_id: luid
upstreamDatabases {
upstr_db_name: name
upstr_db_type: connectionType
upstr_db_id: luid
upstr_db_isEmbedded: isEmbedded
}
}
upstreamDatasources {
upstr_ds_name: name
upstr_ds_id: luid
upstr_ds_project: projectName
}
embeddedDatasources {
emb_ds_name: name
}
upstreamFlows {
flow_name: name
flow_id: luid
flow_project: projectName
}
}
}
"""
query_databases = """
{
databaseServers {
database_hostname: hostName
database_port: port
database_id: luid
}
}
"""
# get the desired data from the PostgreSQL database
pg_connection_string = "postgresql://{0}:{1}@{2}:{3}/{4}".format(pg_config['username'],
pg_config['password'],
pg_config['host'],
pg_config['port'],
pg_config['database'])
str_cols = ['site_name', 'site_id', 'view_name', 'view_id', 'workbook_name']
core_cols = ['view_id', 'last_interaction_date', 'num_views']
sql_engine = create_engine(pg_connection_string)
results_df = pd.read_sql_query(pg_query, con=sql_engine)
results_df[str_cols] = results_df[str_cols].astype(str)
pg_views_df = results_df[core_cols]
print(pg_views_df.head())
# establish a connection with Tableau Server and extract the desired metadata from the Metadata API
conn = TableauServerConnection(tableau_server_config, 'my_env')
conn.sign_in()
wb_query_results = conn.metadata_graphql_query(query_workbooks)
db_query_results = conn.metadata_graphql_query(query_databases)
db_query_results_json = db_query_results.json()['data']['databaseServers']
wb_query_results_json = wb_query_results.json()['data']['workbooks']
wb_df = json_normalize(wb_query_results.json()['data']['workbooks'])
wb_df.drop(columns=['views', 'upstreamTables', 'upstreamDatasources', 'embeddedDatasources', 'upstreamFlows'], inplace=True)
wb_views_df = json_normalize(data=wb_query_results_json, record_path='views', meta='workbook_id')
wb_tables_df = json_normalize(data=wb_query_results_json, record_path='upstreamTables', meta='workbook_id')
wb_tables_dbs_df = flatten_dict_list_column(df=wb_tables_df, col_name='upstreamDatabases')
db_df = pd.DataFrame(db_query_results_json)
wb_uds_df = json_normalize(data=wb_query_results_json, record_path='upstreamDatasources', meta='workbook_id')
wb_eds_df = json_normalize(data=wb_query_results_json, record_path='embeddedDatasources', meta='workbook_id')
wb_flows_df = json_normalize(data=wb_query_results_json, record_path='upstreamFlows', meta='workbook_id')
combined_df = wb_df.merge(wb_views_df, how='left', on='workbook_id')
combined_df = combined_df.merge(wb_tables_dbs_df, how='left', on='workbook_id')
combined_df = combined_df.merge(db_df, how='left', left_on='upstr_db_id', right_on='database_id')
combined_df = combined_df.merge(wb_uds_df, how='left', on='workbook_id')
combined_df = combined_df.merge(wb_eds_df, how='left', on='workbook_id')
combined_df['summary_date'] = pd.datetime.now()
combined_df['site_name'] = conn.site_name
# combine the Metadata from milestone 1 with our PostgreSQL data from milestone 2
combined_df = combined_df.merge(pg_views_df, how='left', on=['view_id'])
# END MILESTONES 1 & 2
# BEGIN MILESTONE 3
# take a look at the data types in your DataFrame, so you have an idea of how to feed data into the Hyper API
print(combined_df.dtypes)
# a bit of pre-processing on our DataFrame so the Hyper API doesn't throw errors due to NaN values
non_date_columns = [col for col in combined_df.columns if col not in ['last_interaction_date', 'summary_date']]
combined_df[non_date_columns] = combined_df[non_date_columns].fillna('')
combined_df['database_port'] = combined_df['database_port'].astype('str')
combined_df['num_views'] = combined_df['num_views'].apply(lambda x: str(x).replace('', '0'))
combined_df['num_views'] = combined_df['num_views'].astype('float64')
combined_df['last_interaction_date'] = combined_df['last_interaction_date'].apply(lambda x: str(x.date()))
combined_df['summary_date'] = combined_df['summary_date'].apply(lambda x: str(x.date()))
# create the .hyper extract file
PATH_TO_HYPER = 'workbooks_and_owners.hyper'
# Step 1: Start a new private local Hyper instance
with HyperProcess(Telemetry.SEND_USAGE_DATA_TO_TABLEAU, 'myapp' ) as hyper:
# Step 2: Create the the .hyper file, replace it if it already exists
with Connection(endpoint=hyper.endpoint,
create_mode=CreateMode.CREATE_AND_REPLACE,
database=PATH_TO_HYPER) as connection:
# Step 3: Create the schema
connection.catalog.create_schema('Extract')
# Step 4: Create the table definition
schema = TableDefinition(table_name=TableName('Extract','Extract'),
columns=[
TableDefinition.Column('workbook_name', SqlType.text()),
TableDefinition.Column('workbook_id', SqlType.text()),
TableDefinition.Column('workbook_project', SqlType.text()),
TableDefinition.Column('view_type', SqlType.text()),
TableDefinition.Column('view_name', SqlType.text()),
TableDefinition.Column('view_id', SqlType.text()),
TableDefinition.Column('upstr_table_name', SqlType.text()),
TableDefinition.Column('upstr_table_id', SqlType.text()),
TableDefinition.Column('upstr_db_name', SqlType.text()),
TableDefinition.Column('upstr_db_type', SqlType.text()),
TableDefinition.Column('upstr_db_id', SqlType.text()),
TableDefinition.Column('upstr_db_isEmbedded', SqlType.bool()),
TableDefinition.Column('database_hostname', SqlType.text()),
TableDefinition.Column('database_port', SqlType.text()),
TableDefinition.Column('database_id', SqlType.text()),
TableDefinition.Column('upstr_ds_name', SqlType.text()),
TableDefinition.Column('upstr_ds_id', SqlType.text()),
TableDefinition.Column('upstr_ds_project', SqlType.text()),
TableDefinition.Column('emb_ds_name', SqlType.text()),
TableDefinition.Column('summary_date', SqlType.text()),
TableDefinition.Column('site_name', SqlType.text()),
TableDefinition.Column('last_interaction_date', SqlType.text()),
TableDefinition.Column('num_views', SqlType.double())
])
# Step 5: Create the table in the connection catalog
connection.catalog.create_table(schema)
with Inserter(connection, schema) as inserter:
for index, row in combined_df.iterrows():
inserter.add_row(row)
inserter.execute()
print("The connection to the Hyper file is closed.")
# publish the .hyper extract to Tableau Server
# get information about your projects; you need a project ID to publish to
projects_df = get_projects_dataframe(conn)
print(projects_df[['name', 'id']])
response = conn.publish_data_source(datasource_file_path=PATH_TO_HYPER,
datasource_name='Impact Analysis Milestone3',
project_id='<YOUR_PROJECT_ID>')
# verify that the response indicates the datasource was published successfully
print(response.json())
conn.sign_out()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment