Skip to content

Instantly share code, notes, and snippets.

@silberman
Last active May 4, 2022 17:08
Show Gist options
  • Save silberman/2d9454cf140cf39660b2dbd6aaedfde1 to your computer and use it in GitHub Desktop.
Save silberman/2d9454cf140cf39660b2dbd6aaedfde1 to your computer and use it in GitHub Desktop.
"""
Example script for loading data from a .csv file into your Mozart Snowflake instance.
To use, specify the 6 constants defined at the top:
SNOWFLAKE_ACCOUNT, SNOWFLAKE_USERNAME, SNOWFLAKE_PASSWORD, and SNOWFLAKE_DATABASE come from https://app.mozartdata.com/integrations/mode
TARGET_SCHEMA and TARGET_TABLE are where we'll be writing to
CSV_FILEPATH is the full path to the csv file we'll be uploading
Install snowflake-sqlalchemy: https://docs.snowflake.com/en/user-guide/sqlalchemy.html#installing-snowflake-sqlalchemy
tldr: pip install --upgrade snowflake-sqlalchemy
Notes:
* The table we're COPYing into has to already exist. This can be done in Snowflake's console
or in code by modifying the example table definition and using the create_table() function here.
* The credentials used will need to have write permissions, ie will need to a Mozart Admin
* This uses the "internal table stage": https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html#table-stages
"""
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
# Connection info - from https://app.mozartdata.com/export/mode
SNOWFLAKE_ACCOUNT = "???"
SNOWFLAKE_USERNAME = "???"
SNOWFLAKE_PASSWORD = "???"
SNOWFLAKE_DATABASE = "???"
# Schema and table in which we'll put this data
TARGET_SCHEMA = "csvs"
TARGET_TABLE = "???"
# Full path to the csv file we'll be uploading
CSV_FILEPATH = "???"
def make_snowflake_engine():
url = URL(
account=SNOWFLAKE_ACCOUNT,
user=SNOWFLAKE_USERNAME,
password=SNOWFLAKE_PASSWORD,
database=SNOWFLAKE_DATABASE
)
engine = create_engine(url)
return engine
def create_table():
engine = make_snowflake_engine()
with engine.begin() as conn:
result = conn.execute(f"CREATE SCHEMA IF NOT EXISTS {TARGET_SCHEMA}").fetchall()
print(result)
result = conn.execute(f"CREATE TABLE IF NOT EXISTS {TARGET_SCHEMA}.{TARGET_TABLE} (user_id number, name varchar, anumber number)").fetchall()
print(result)
def put_file_on_stage(filepath):
engine = make_snowflake_engine()
with engine.begin() as conn:
result = conn.execute(f"USE SCHEMA {TARGET_SCHEMA}").fetchall()
print(result)
result = conn.execute(f"put file://{filepath} @%{TARGET_TABLE} auto_compress=true").fetchall()
print(result)
result = conn.execute(f"copy into {TARGET_TABLE} file_format = (type = csv)").fetchall()
print(result)
if __name__ == "__main__":
# create_table()
put_file_on_stage(CSV_FILEPATH)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment