Skip to content

Instantly share code, notes, and snippets.

@mmautner
Last active July 11, 2023 17:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mmautner/c102bff31d386679f021 to your computer and use it in GitHub Desktop.
Save mmautner/c102bff31d386679f021 to your computer and use it in GitHub Desktop.
Redshift Upserts w/ Python
import uuid
import psycopg2
from secret import REDSHIFT_CREDS
from secret import AWS_ACCESS_KEY, AWS_SECRET_KEY
def get_primary_keys(tablename, db):
c = db.cursor()
sql = "select indexdef from pg_indexes where tablename = '%s';" % tablename
c.execute(sql)
result = c.fetchall()[0][0]
rfields = result.split('(')[1].strip(')').split(',')
fields = [field.strip().strip('"') for field in rfields]
return fields
def load_s3_file_into_redshift(s3path, tablename, delimiter='|'):
"""File must be gzipped:
s3path on Amazon S3 - (str)
tablename - (str)
field delimiter - (str)"""
temp_tablename = 'temp_%s' % uuid.uuid4().get_hex()
db = psycopg2.connect(**REDSHIFT_CREDS)
primary_keys = get_primary_keys(tablename, db)
equals_clause = '{dest}.%s = {src}.%s'
join_clause = ' AND '.join([equals_clause % (pk, pk) for pk in primary_keys])
join_clause = join_clause.format(dest=tablename, src=temp_tablename)
upsert_qry = """\
CREATE TEMPORARY TABLE {src} (LIKE {dest});
COPY {src} FROM '{s3path}'
CREDENTIALS 'aws_access_key_id={access_key};aws_secret_access_key={secret_key}'
DELIMITER '{delimiter}' gzip;
BEGIN;
LOCK {dest};
DELETE FROM {dest} USING {src} WHERE {join_clause};
INSERT INTO {dest} SELECT * FROM {src};
END;
""".format(dest=tablename,
src=temp_tablename,
s3path=s3path,
join_clause=join_clause,
access_key=AWS_ACCESS_KEY,
secret_key=AWS_SECRET_KEY,
delimiter=delimiter)
c = db.cursor()
c.execute(upsert_qry)
db.commit()
return s3path
@Uzzije
Copy link

Uzzije commented Nov 3, 2022

Thanks for this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment