Last active
August 31, 2023 00:36
-
-
Save abelsonlive/5666c4ad4f8922d127a257eaac8a24d9 to your computer and use it in GitHub Desktop.
Dynamically add partitions to a spectrum table
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws_access_key_id: | |
aws_secret_access_key: | |
rs_user: | |
rs_server: | |
rs_db: | |
rs_port: | |
rs_password: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
import yaml | |
import s3plz | |
from sqlalchemy import create_engine | |
TABLES = [ | |
{ | |
"s3path": "s3://bucket/path", | |
"tablename": "device_events_json", | |
"schemaname": "spectrum", | |
"partitions": ['part_date', 'part_hour'] | |
} | |
] | |
def log(s): | |
sys.stderr.write(s + "\n") | |
class SpectrumTable(object): | |
def __init__(self, s3path, dbpath, tablename, schemaname, partitions, key, secret): | |
self.bucket = s3path.replace('s3://', '').split('/')[0] | |
self.s3path = s3path | |
self.dbpath = dbpath | |
self.schemaname = schemaname | |
self.tablename = tablename | |
self.partitions = partitions | |
self.s3 = s3plz.connect("s3://{}".format(self.bucket), key=key, secret=secret) | |
self.db = create_engine(self.dbpath).connect() | |
@property | |
def s3_partitions(self): | |
""" | |
Find partitions on S3. | |
""" | |
seen = set() | |
for p in self.s3.ls(self.s3path): | |
part = '/'.join(p.split('/')[-(1+len(self.partitions)):-1]) | |
if part not in seen: | |
seen.add(part) | |
yield part | |
@property | |
def redshift_partitions(self): | |
""" | |
Find partitions in Redshift | |
""" | |
res = self.db.execute(""" | |
SELECT values | |
FROM SVV_EXTERNAL_PARTITIONS | |
WHERE tablename = '{}' | |
""".format(self.tablename)) | |
for r in res: | |
yield '/'.join(eval(r['values'])) | |
@property | |
def new_partitions(self): | |
""" | |
Find new partitions. | |
""" | |
seen = set(list(self.redshift_partitions)) | |
for p in self.s3_partitions: | |
if p not in seen: | |
yield p.split('/') | |
def add_partition_query(self, partitions): | |
""" | |
Query for adding a partition. | |
""" | |
return "ALTER TABLE {0}.{1} ADD PARTITION({2}='{3}', {4}='{5}') LOCATION '{6}/{3}/{5}';"\ | |
.format(self.schemaname, self.tablename, | |
self.partitions[0], partitions[0], | |
self.partitions[1], partitions[1], | |
self.s3path) | |
@property | |
def update_query(self): | |
""" | |
Query for updating this table. | |
""" | |
return " ".join(map(self.add_partition_query, self.new_partitions)) | |
def main(): | |
if len(sys.argv) < 2: | |
raise Exception("usage: python update_spectrum_tables.py {pg_config}") | |
# parse configs | |
config = yaml.load(open(sys.argv[1], 'rb')) | |
dbpath = "postgresql://{rs_user}:{rs_password}@{rs_server}:{rs_port}/{rs_db}".format(**config) | |
psql_cmd = "export PGPASSWORD={rs_password} && psql -U {rs_user} -h {rs_server} -p {rs_port} -d {rs_db}".format(**config) | |
for table in TABLES: | |
log("Updating partitions for {tablename}...".format(**table)) | |
table['key'] = config['aws_access_key_id'] | |
table['secret'] = config['aws_secret_access_key'] | |
table['dbpath'] = dbpath | |
qry = SpectrumTable(**table).update_query.strip() | |
if qry != "": | |
cmd = psql_cmd + ' -c "{}"'.format(qry) | |
log("Running command:") | |
log(cmd) | |
os.system(cmd) | |
else: | |
log("No new partitions!") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
usage: