Skip to content

Instantly share code, notes, and snippets.

@josepablog
Forked from TomAugspurger/to_redshift.py
Last active August 10, 2021 02:22
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save josepablog/1ce154a45dc20348b6718804ac8ad0a5 to your computer and use it in GitHub Desktop.
Save josepablog/1ce154a45dc20348b6718804ac8ad0a5 to your computer and use it in GitHub Desktop.
to_redshift.py
import gzip
from functools import wraps
import boto3
from sqlalchemy import MetaData
from pandas import DataFrame
from pandas.io.sql import SQLTable, pandasSQL_builder
import psycopg2
import codecs
import cStringIO
from io import BytesIO
def monkeypatch_method(cls):
"""
Creates a decoration for monkey-patching a class
Recipe from: https://mail.python.org/pipermail/python-dev/2008-January/076194.html
Args:
cls:
Returns:
"""
@wraps(cls)
def decorator(func):
setattr(cls, func.__name__, func)
return func
return decorator
def resolve_qualname(table_name, schema=None):
name = '.'.join([schema, table_name]) if schema is not None else table_name
return name
@monkeypatch_method(DataFrame)
def to_redshift(self, table_name, engine, s3_bucket, s3_keypath=None,
schema=None, if_exists='fail', index=True,
compress=True, aws_access_key_id=None, aws_secret_access_key=None,
null_as=None, emptyasnull=True):
"""
Inserts dataframe to Redshift by creating a file in S3
Args:
self: Panda' dataframe to insert into Redshift
table_name: Name of the table to insert dataframe
engine: An SQL alchemy engine object
bucket: S3 bucket name
keypath: Keypath in s3 (without bucket name)
schema: Redshift schema
if_exists: {'fail', 'append', 'replace'}
index: bool; whether to include the DataFrame's index
compress: Compresses data before uploading it to S3
aws_access_key_id: from ~./boto by default
aws_secret_access_key: from ~./boto by default
null_as: treat these values as null (not tested)
emptyasnull: bool; whether '' is inserted as null
Returns:
"""
url, aws_access_key_id, aws_secret_access_key = self.to_s3(bucket=s3_bucket, keypath=s3_keypath, index=index, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, compress=compress)
qualname = resolve_qualname(table_name, schema)
table = SQLTable(table_name, pandasSQL_builder(engine, schema=schema),
self, if_exists=if_exists, index=index)
print("Creating table {}".format(qualname))
if table.exists():
if if_exists == 'fail':
raise ValueError("Table Exists")
elif if_exists == 'append':
queue = []
elif if_exists == 'replace':
queue = ['drop table {};'.format(qualname), table.sql_schema() + ";"]
else:
raise ValueError("Bad option for `if_exists`")
else:
queue = [table.sql_schema() + ";"]
stmt = ("COPY {qualname}\n"
"FROM 's3://{keypath}' \n"
"CREDENTIALS 'aws_access_key_id={key};aws_secret_access_key={secret}' "
"{gzip} "
"{null_as} "
#"{emptyasnull}"
"CSV IGNOREHEADER 1;").format(qualname=qualname,
keypath=url,
key=aws_access_key_id,
secret=aws_secret_access_key,
gzip="GZIP " if compress else " ",
null_as="NULL AS '{}'".format(null_as) if null_as is not None else "",
emptyasnull="EMPTYASNULLL " if emptyasnull else " ")
queue.append(stmt)
print "Querying Redshift..."
with engine.begin() as con:
for stmt in queue:
print stmt
con.execute(stmt)
@monkeypatch_method(DataFrame)
def to_s3(self, bucket, keypath, index=True, compress=True, encoding="ascii", aws_access_key_id=None, aws_secret_access_key=None):
"""
Writes the data frame to S3
Args:
self: Dataframe to upload
bucket: S3' bucket
keypath: S3's keypath
index: whether to include the index of the dataframe
compress: whether to compress the data before uploading it
encoding: Ascii by default
aws_access_key_id: from ~./boto by default
aws_secret_access_key: from ~./boto by default
Returns: The S3 URL of the file, and the credentials used to upload it
"""
print "Exporting to S3..."
# Figure out paths:
keypath = "{filename}.{ext}".format(filename=keypath, ext="gzip" if compress else "csv")
url = bucket + '/' + keypath
# Create S3 session
session = boto3.Session(aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
aws_access_key_id = session.get_credentials().access_key
aws_secret_access_key = session.get_credentials().secret_key
s3 = session.resource('s3')
obj = s3.Object(bucket_name=bucket, key=keypath)
# Create a memory file that allows unicode:
buffer = cStringIO.StringIO()
codecinfo = codecs.lookup("utf8")
fp = codecs.StreamReaderWriter(buffer, codecinfo.streamreader, codecinfo.streamwriter)
# Compress
gzfp = BytesIO()
self.to_csv(fp, index=index, encoding=encoding)
if compress:
print "Compressing"
fp.seek(0)
gzipped = gzip.GzipFile(fileobj=gzfp, mode='w')
gzipped.write(bytes(fp.read()))
gzipped.close()
gzfp.seek(0)
else:
gzfp = fp
gzfp.seek(0)
print("Uploading to {}".format(url))
obj.upload_fileobj(gzfp)
return url, aws_access_key_id, aws_secret_access_key
@josepablog
Copy link
Author

josepablog commented Sep 9, 2016

Summary

This code modifies the Pandas DataFrame class to allow to upload to RedShift via S3. The original code had errors that I fixed. I also refactored the code to make it more readable.

Usage

import pandas as pd
from sqlalchemy import create_engine
import to_redshift

CONN_STR = "redshift+psycopg2://<SOME_CONNECTION_STRING>" 
red_engine =create_engine(CONN_STR)

df = pd.read_csv("my_file.csv")
df.to_redshift(table_name="my_table",  # SQL Target table
               engine=red_engine,      # SQL engine
               s3_bucket="my_bucket",   # S3 intermediate bucket
               s3_keypath="dir/file",
               if_exists='replace')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment