Created
October 12, 2016 14:24
-
-
Save sjanahan/9d7522117094933643dcbded1a921d1c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import boto3 | |
import urllib | |
import os | |
import psycopg2 | |
print('Loading function') | |
s3_client = boto3.client('s3') | |
s3_object = boto3.resource('s3') | |
def lambda_handler(event, context): | |
print("inside lambda handler") | |
# extracting information from the objectCreated event from s3 | |
event_metadata = event['Records'][0]['s3'] | |
bucket_name = event_metadata['bucket']['name'] | |
key_name = urllib.unquote_plus(event_metadata['object']['key'].encode('utf8')) | |
s3_key_last_modified = s3_object.Object(bucket_name, key_name).last_modified | |
print("%s last modified at: %s" % (key_name, s3_key_last_modified)) | |
tallboy_application_environment = os.path.dirname(key_name) | |
table_name = os.path.splitext(os.path.basename(key_name))[0] | |
SCHEMA_NAME = bucket_name # on purpose, these should be same | |
# building the dsn to connect to redshift with | |
redshift_url_key = '%s/credentials/redshift_url' % tallboy_application_environment | |
redshift_url = s3_client.get_object(Bucket=bucket_name, Key=redshift_url_key) | |
redshift_url_contents = redshift_url['Body'].read().decode("utf-8") | |
dsn = " ".join(x for x in redshift_url_contents.split("\n")) | |
# building the credentials necessary for role to write to redshift | |
iam_role_key = '%s/credentials/redshift_iam_role' % tallboy_application_environment | |
iam_role_obj = s3_client.get_object(Bucket=bucket_name, Key=iam_role_key) | |
iam_role = iam_role_obj['Body'].read().decode("utf-8").strip() | |
try: | |
print("Attempting to connect to redshift...") | |
conn = psycopg2.connect(dsn) | |
print("Connected to redshift!") | |
with conn.cursor() as cur: | |
# we are deleting here instead of truncating because | |
# in redshift you need to be a superuser to truncate a table | |
delete_cmd = """ | |
DELETE "%s"."%s" | |
""" % (SCHEMA_NAME, table_name) | |
print("Sequentially deleting rows from %s.%s" % (SCHEMA_NAME, table_name)) | |
cur.execute(delete_cmd) | |
# copy from s3 into redshift | |
copy_cmd = """ | |
COPY "%s"."%s" | |
FROM 's3://%s/%s' | |
CREDENTIALS 'aws_iam_role=%s' | |
DELIMITER ',' ESCAPE REMOVEQUOTES | |
TIMEFORMAT AS 'auto' DATEFORMAT AS 'auto' | |
IGNOREHEADER AS 1 | |
TRUNCATECOLUMNS; | |
""" % (SCHEMA_NAME, table_name, bucket_name, key_name, iam_role) | |
print("Copying %s from s3 into redshift" % (key_name)) | |
cur.execute(copy_cmd) | |
except Exception as e: | |
print("-> Redshift error: %s" % e) | |
finally: | |
return |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment