Skip to content

Instantly share code, notes, and snippets.

@mattarderne
Last active February 2, 2021 07:46
Show Gist options
  • Save mattarderne/163060f149de85f9f011d8e225974cff to your computer and use it in GitHub Desktop.
Save mattarderne/163060f149de85f9f011d8e225974cff to your computer and use it in GitHub Desktop.
snowflake_bulk_create
import sqlalchemy as db
import pandas as pd
import snowflake.connector
import os
from snowflake.sqlalchemy import URL
from datetime import datetime
# import records
def connect(database, role = 'TRANSFORMER', warehouse = 'TRANSFORMING'):
'''
Connect to Snowflake, returns the cursor to be reused
'''
# Get passwords etc from the .env file
try:
engine = db.create_engine(
URL(user=os.getenv("user"),
password=os.getenv("pdw"),
account='<account>',
role=role,
warehouse=warehouse))
con = engine.connect()
con.execute(f'USE WAREHOUSE {warehouse}')
con.execute(f'USE DATABASE {database}')
except Exception as e:
raise
traceback.format_exception(*sys.exc_info())
raise # reraises the exception
return con;
def query_table(connection, database=None,schema=None,table=None, sql=None):
'''
Connect to snowflake and return a dataframe from the SQL query
'''
if sql:
sql = sql
else:
sql = f'''SELECT
*
FROM "{database}"."{schema}"."{table}"
'''
df = pd.read_sql(sql, connection)
con.close()
return df;
### template for the views
template = """
config {
type: "view",
schema:"CANVAS_INSHOSTEDDATA"
}
with source as (
SELECT * FROM ${getRawDatabase()}.${canvas.CANVAS_INSHOSTEDDATA_SCHEMA}."SNOWFLAKE_TABLE_NAME_VAR"
),
renamed as (
select
ALL_FIELD_NAMES
'' as hack
from source
)
select * from renamed"""
con = connect('RAW_DEV', 'ROLE_DATA_ENGINEERING', 'WAREHOUSE_TRANSFORM_DEV')
tables_sql = """show tables like '%%' in RAW_DEV.CANVAS_INSHOSTEDDATA"""
columns_sql = """desc table RAW_DEV.CANVAS_INSHOSTEDDATA.SNOWFLAKE_TABLE_NAME_VAR;"""
results = query_table(con, 'RAW_DEV', 'CANVAS_INSHOSTEDDATA','',tables_sql)
for name in results.name:
all_columns = ''
columns_query = ''
columns = ''
sql_result = ''
print(name)
## get columns
columns_query = columns_sql.replace('SNOWFLAKE_TABLE_NAME_VAR',name)
con = connect('RAW_DEV', 'ROLE_DATA_ENGINEERING', 'WAREHOUSE_TRANSFORM_DEV')
columns = query_table(con, 'RAW_DEV', 'CANVAS_INSHOSTEDDATA','',columns_query)
## create columns list
for column in columns.name:
# column = ' ' + column + ' as ' + name + '_' + column + ',' + '\n'
column = ' ' + column + ',' + '\n'
all_columns+=column
## create SQL by replacing placeholders with variables
sql_result = template.replace('SNOWFLAKE_TABLE_NAME_VAR',name).replace('ALL_FIELD_NAMES',all_columns)
## write to a file
name = name.lower()
file_name = f'canvas/base/canvas_base_{name}.sqlx'
with open(file_name, 'w') as f:
f.write("%s" % sql_result)
print('done')
@mattarderne
Copy link
Author

mattarderne commented Jun 16, 2020

TODO:

  • remove the CANVAS_INSHOSTEDDATA from the script and replace with a variable
  • remove Pandas
  • Add option for choosing preferred naming convention (col as col_name or col)
  • fix the '' as hack hack
  • clean up the ${getRawDatabase()}.${canvas.CANVAS_INSHOSTEDDATA_SCHEMA}. javascript or remove

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment