Skip to content

Instantly share code, notes, and snippets.

@zacgx
Forked from josepablog/to_redshift.py
Created April 5, 2017 18:12
Show Gist options
  • Save zacgx/88079990e2c6c41209c9aad89aa12b66 to your computer and use it in GitHub Desktop.
Save zacgx/88079990e2c6c41209c9aad89aa12b66 to your computer and use it in GitHub Desktop.
to_redshift.py
import gzip
from functools import wraps
import boto3
from sqlalchemy import MetaData
from pandas import DataFrame
from pandas.io.sql import SQLTable, pandasSQL_builder
import psycopg2
import codecs
import cStringIO
from io import BytesIO
def monkeypatch_method(cls):
"""
Creates a decoration for monkey-patching a class
Recipe from: https://mail.python.org/pipermail/python-dev/2008-January/076194.html
Args:
cls:
Returns:
"""
@wraps(cls)
def decorator(func):
setattr(cls, func.__name__, func)
return func
return decorator
def resolve_qualname(table_name, schema=None):
name = '.'.join([schema, table_name]) if schema is not None else table_name
return name
@monkeypatch_method(DataFrame)
def to_redshift(self, table_name, engine, s3_bucket, s3_keypath=None,
schema=None, if_exists='fail', index=True,
compress=True, aws_access_key_id=None, aws_secret_access_key=None,
null_as=None, emptyasnull=True):
"""
Inserts dataframe to Redshift by creating a file in S3
Args:
self: Panda' dataframe to insert into Redshift
table_name: Name of the table to insert dataframe
engine: An SQL alchemy engine object
bucket: S3 bucket name
keypath: Keypath in s3 (without bucket name)
schema: Redshift schema
if_exists: {'fail', 'append', 'replace'}
index: bool; whether to include the DataFrame's index
compress: Compresses data before uploading it to S3
aws_access_key_id: from ~./boto by default
aws_secret_access_key: from ~./boto by default
null_as: treat these values as null (not tested)
emptyasnull: bool; whether '' is inserted as null
Returns:
"""
url, aws_access_key_id, aws_secret_access_key = self.to_s3(bucket=s3_bucket, keypath=s3_keypath, index=index, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, compress=compress)
qualname = resolve_qualname(table_name, schema)
table = SQLTable(table_name, pandasSQL_builder(engine, schema=schema),
self, if_exists=if_exists, index=index)
print("Creating table {}".format(qualname))
if table.exists():
if if_exists == 'fail':
raise ValueError("Table Exists")
elif if_exists == 'append':
queue = []
elif if_exists == 'replace':
queue = ['drop table {};'.format(qualname), table.sql_schema() + ";"]
else:
raise ValueError("Bad option for `if_exists`")
else:
queue = [table.sql_schema() + ";"]
stmt = ("COPY {qualname}\n"
"FROM 's3://{keypath}' \n"
"CREDENTIALS 'aws_access_key_id={key};aws_secret_access_key={secret}' "
"{gzip} "
"{null_as} "
#"{emptyasnull}"
"CSV IGNOREHEADER 1;").format(qualname=qualname,
keypath=url,
key=aws_access_key_id,
secret=aws_secret_access_key,
gzip="GZIP " if compress else " ",
null_as="NULL AS '{}'".format(null_as) if null_as is not None else "",
emptyasnull="EMPTYASNULLL " if emptyasnull else " ")
queue.append(stmt)
print "Querying Redshift..."
with engine.begin() as con:
for stmt in queue:
print stmt
con.execute(stmt)
@monkeypatch_method(DataFrame)
def to_s3(self, bucket, keypath, index=True, compress=True, encoding="ascii", aws_access_key_id=None, aws_secret_access_key=None):
"""
Writes the data frame to S3
Args:
self: Dataframe to upload
bucket: S3' bucket
keypath: S3's keypath
index: whether to include the index of the dataframe
compress: whether to compress the data before uploading it
encoding: Ascii by default
aws_access_key_id: from ~./boto by default
aws_secret_access_key: from ~./boto by default
Returns: The S3 URL of the file, and the credentials used to upload it
"""
print "Exporting to S3..."
# Figure out paths:
keypath = "{filename}.{ext}".format(filename=keypath, ext="gzip" if compress else "csv")
url = bucket + '/' + keypath
# Create S3 session
session = boto3.Session(aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
aws_access_key_id = session.get_credentials().access_key
aws_secret_access_key = session.get_credentials().secret_key
s3 = session.resource('s3')
obj = s3.Object(bucket_name=bucket, key=keypath)
# Create a memory file that allows unicode:
buffer = cStringIO.StringIO()
codecinfo = codecs.lookup("utf8")
fp = codecs.StreamReaderWriter(buffer, codecinfo.streamreader, codecinfo.streamwriter)
# Compress
gzfp = BytesIO()
self.to_csv(fp, index=index, encoding=encoding)
if compress:
print "Compressing"
fp.seek(0)
gzipped = gzip.GzipFile(fileobj=gzfp, mode='w')
gzipped.write(bytes(fp.read()))
gzipped.close()
gzfp.seek(0)
else:
gzfp = fp
gzfp.seek(0)
print("Uploading to {}".format(url))
obj.upload_fileobj(gzfp)
return url, aws_access_key_id, aws_secret_access_key
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment