Created
August 21, 2019 19:12
-
-
Save monipip3/f52edcee3fd3c04bb9b910ad58867889 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Created on Thu Aug 1 11:28:12 2019 | |
@author: mpuerto | |
""" | |
#This file will read a CSV, understsand your data types, to then create a SQL create table statement and populate it via your S3 file | |
import csv, ast, psycopg2, os | |
f = open(r"C:\Users\mpuerto\Downloads\x.csv") #reference the file you want | |
reader = csv.reader(f,delimiter = ',') | |
longest, headers, type_list = [], [], [] | |
print("I have the csv now") | |
def dataType(val, current_type): | |
try: | |
# Evaluates numbers to an appropriate type, and strings an error | |
t = ast.literal_eval(val) | |
except ValueError: | |
return 'varchar' | |
except SyntaxError: | |
return 'varchar' | |
if type(t) in [int, float]: | |
if (type(t) in [int]) and current_type not in ['float', 'varchar']: | |
# Use smallest possible int type | |
if (-32768 < t < 32767) and current_type not in ['int', 'bigint']: | |
return 'smallint' | |
#elif (-2147483648 < t < 2147483647) and current_type not in ['bigint']: | |
#return 'int' | |
else: | |
return 'bigint' | |
if type(t) is float and current_type not in ['varchar']: | |
return 'decimal' | |
else: | |
return 'varchar' | |
for row in reader: | |
if len(headers) == 0: | |
headers = row | |
headers = [x.replace("-","_").replace("+","_").strip() for x in headers] | |
for col in row: | |
longest.append(0) | |
type_list.append('') | |
else: | |
for i in range(len(row)): | |
# NA is the csv null value | |
if type_list[i] == 'varchar': | |
pass | |
else: | |
var_type = dataType(row[i], type_list[i]) | |
type_list[i] = var_type | |
if len(row[i]) > longest[i]: | |
longest[i] = len(row[i]) | |
f.close() | |
print('Created functions') | |
statement = 'create table x ('#use your schema and table name you want | |
for i in range(len(headers)): | |
if type_list[i] == 'varchar': | |
statement = (statement + '\n{} varchar,').format(headers[i].lower()) | |
else: | |
statement = (statement + '\n' + '{} {}' + ',').format(headers[i].lower(), type_list[i]) | |
statement = statement[:-1] + ');' | |
conn = psycopg2.connect( | |
host='{}'.format(os.environ['db_host'].replace("'",""), | |
user='{}'.format(os.environ['db_user'].replace("'",""), | |
port=5439, | |
password='{}'.format(os.environ['db_password'].replace("'",""), | |
dbname='analytics') | |
print("Made the connection") | |
cur = conn.cursor() | |
cur.execute(statement) | |
conn.commit() | |
print("Created the table") | |
#input your access ids and reference the table you created above ^ and the file you will populate. The example below would use a csv file | |
sql = """copy x from 's3://' | |
WITH CREDENTIALS 'aws_access_key_id={0};aws_secret_access_key={1}' | |
REGION 'us-east-1' | |
delimiter ',' | |
ignoreheader 1 | |
emptyasnull | |
ACCEPTANYDATE | |
csv;""".format(os.environ['aws_access_key'].replace("'",""),os.environ['aws_secret_access_key'].replace("'","")) | |
cur.execute(sql) | |
conn.commit() | |
print("Uploaded data into table") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I'd like to use this script how could I attribute to you?