Skip to content

Instantly share code, notes, and snippets.

@abelsonlive
Last active August 31, 2023 00:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save abelsonlive/5666c4ad4f8922d127a257eaac8a24d9 to your computer and use it in GitHub Desktop.
Save abelsonlive/5666c4ad4f8922d127a257eaac8a24d9 to your computer and use it in GitHub Desktop.
Dynamically add partitions to a spectrum table
aws_access_key_id:
aws_secret_access_key:
rs_user:
rs_server:
rs_db:
rs_port:
rs_password:
import sys
import os
import yaml
import s3plz
from sqlalchemy import create_engine
TABLES = [
{
"s3path": "s3://bucket/path",
"tablename": "device_events_json",
"schemaname": "spectrum",
"partitions": ['part_date', 'part_hour']
}
]
def log(s):
sys.stderr.write(s + "\n")
class SpectrumTable(object):
def __init__(self, s3path, dbpath, tablename, schemaname, partitions, key, secret):
self.bucket = s3path.replace('s3://', '').split('/')[0]
self.s3path = s3path
self.dbpath = dbpath
self.schemaname = schemaname
self.tablename = tablename
self.partitions = partitions
self.s3 = s3plz.connect("s3://{}".format(self.bucket), key=key, secret=secret)
self.db = create_engine(self.dbpath).connect()
@property
def s3_partitions(self):
"""
Find partitions on S3.
"""
seen = set()
for p in self.s3.ls(self.s3path):
part = '/'.join(p.split('/')[-(1+len(self.partitions)):-1])
if part not in seen:
seen.add(part)
yield part
@property
def redshift_partitions(self):
"""
Find partitions in Redshift
"""
res = self.db.execute("""
SELECT values
FROM SVV_EXTERNAL_PARTITIONS
WHERE tablename = '{}'
""".format(self.tablename))
for r in res:
yield '/'.join(eval(r['values']))
@property
def new_partitions(self):
"""
Find new partitions.
"""
seen = set(list(self.redshift_partitions))
for p in self.s3_partitions:
if p not in seen:
yield p.split('/')
def add_partition_query(self, partitions):
"""
Query for adding a partition.
"""
return "ALTER TABLE {0}.{1} ADD PARTITION({2}='{3}', {4}='{5}') LOCATION '{6}/{3}/{5}';"\
.format(self.schemaname, self.tablename,
self.partitions[0], partitions[0],
self.partitions[1], partitions[1],
self.s3path)
@property
def update_query(self):
"""
Query for updating this table.
"""
return " ".join(map(self.add_partition_query, self.new_partitions))
def main():
if len(sys.argv) < 2:
raise Exception("usage: python update_spectrum_tables.py {pg_config}")
# parse configs
config = yaml.load(open(sys.argv[1], 'rb'))
dbpath = "postgresql://{rs_user}:{rs_password}@{rs_server}:{rs_port}/{rs_db}".format(**config)
psql_cmd = "export PGPASSWORD={rs_password} && psql -U {rs_user} -h {rs_server} -p {rs_port} -d {rs_db}".format(**config)
for table in TABLES:
log("Updating partitions for {tablename}...".format(**table))
table['key'] = config['aws_access_key_id']
table['secret'] = config['aws_secret_access_key']
table['dbpath'] = dbpath
qry = SpectrumTable(**table).update_query.strip()
if qry != "":
cmd = psql_cmd + ' -c "{}"'.format(qry)
log("Running command:")
log(cmd)
os.system(cmd)
else:
log("No new partitions!")
if __name__ == '__main__':
main()
@abelsonlive
Copy link
Author

usage:

python update_spectrum_tables.py config.yml

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment