Skip to content

Instantly share code, notes, and snippets.

@dlstadther
Last active June 9, 2021 07:35
Show Gist options
  • Save dlstadther/f835a234f54a46728412 to your computer and use it in GitHub Desktop.
Save dlstadther/f835a234f54a46728412 to your computer and use it in GitHub Desktop.
Using Luigi's Redshift and S3
# import luigi modules
import luigi
from luigi.contrib import redshift
from luigi import configuration, s3
# import python core modules
import os
import shutil
import datetime
# meta data
__author__ = 'Dillon Stadther'
__date__ = '2016-01-06'
class redshift(luigi.Config):
"""Utilizes Luigi's config system to get config variables within the <class_name> tag from luigi.cfg."""
host = luigi.Parameter(default='')
database = luigi.Parameter(default='')
user = luigi.Parameter(default='')
password = luigi.Parameter(default='')
class s3(luigi.Config):
"""Utilizes Luigi's config system to get config variables within the <class_name> tag from luigi.cfg."""
aws_access_key_id = luigi.Parameter(default='')
aws_secret_access_key = luigi.Parameter(default='')
class ExampleTask(luigi.WrapperTask):
# overkill - if run on a machine using UTC time, then use:
# date = luigi.DateParameter(default=datetime.date.today())
date = luigi.DateParameter(default=datetime.date(datetime.datetime.utcnow()))
def requires(self):
"""
Anything returned or yielded by requires must have a 'true' complete() method (aka successful output) before
this class's run method will execute.
"""
jobs = [
{
'table': 'my_table',
'fn': 'my_file'
},
{
'table': 'your_table',
'fn': 'your_file'
}
]
for job in jobs:
yield MoveS3(
date=self.date,
table=job['table'],
fn=job['fn']
)
class MoveS3(luigi.Task):
"""Utilization of luigi.contrib.s3.move() to move a single file from one s3 bucket to another"""
date = luigi.DateParameter()
table = luigi.Parameter()
fn = luigi.Parameter()
s3_load_bucket = luigi.Parameter(config_path=dict(section='path', name='s3_load_bucket'))
s3_complete_bucket = luigi.Parameter(config_path=dict(section='path', name='s3_complete_bucket'))
client = s3.S3Client()
def requires(self):
return [RedshiftCopy(date=self.date, table=self.table, fn=self.fn)]
def output(self):
return s3.S3Target(self.s3_complete_bucket + self.fn, client=self.client)
def run(self):
self.client.move(self.s3_load_bucket + self.fn, self.s3_complete_bucket + self.fn)
class RedshiftCopy(redshift.S3CopyToTable):
"""Implementation of redshift.S3CopyToTable"""
date = luigi.DateParameter()
fn = luigi.Parameter()
# there are a lot more options available here (i.e. temp table copy, prune based on date or column, etc.)
table_type = luigi.Parameter(default='')
table = luigi.Parameter()
queries = luigi.Parameter(default=[])
copy_options = "CSV IGNOREHEADER 1 BLANKSASNULL EMPTYASNULL TIMEFORMAT 'auto' DATEFORMAT 'auto'"
host = redshift().host
database = redshift().database
user = redshift().user
password = redshift().password
aws_access_key_id = s3().aws_access_key_id
aws_secret_access_key = s3().aws_secret_access_key
def s3_load_path(self):
# optional usages (for the variations noted on L108-L109
# return self.input().path
# return self.input()['s3'].path
return self.input()[0].path
def requires(self):
# optional variations are:
# return ToS3(...)
# return {'s3': ToS3(...)}
return [
ToS3(date=self.date, fn=self.fn)
]
class ToS3(luigi.Task):
"""Utilization of luigi.s3. Pushes single input file to designated s3 bucket."""
date = luigi.DateParameter()
fn = luigi.Parameter()
s3_load_bucket = luigi.Parameter(config_path=dict(section='path', name='s3_load_bucket'))
tos3_path = luigi.Parameter(config_path=dict(section='path', name='tos3_path'))
client = s3.S3Client() # you do not have to specify parameters here b/c the S3Client() looks in your luigi.cfg
def get_fn(self):
return self.input()[0].path.split('/')[-1]
def requires(self):
return [
ExampleBaseTask(date=self.date, fn=self.fn)
]
def output(self):
return s3.S3Target(self.s3_load_bucket + self.get_fn(), client=self.client)
def run(self):
self.client.put(self.tos3_path + '/' + self.get_fn(), self.s3_load_bucket + self.get_fn())
class ExampleBaseTask(luigi.Task):
"""Main task to do whatever. Sometimes this may also requies other things"""
date = luigi.DateParameter()
fn = luigi.Parameter()
toS3_path = luigi.Parameter(config_path=dict(section='path', name='toS3_path'))
# since this task doesn't 'require' anything, that method can be defaulted to luigi.Task.require()
def output(self):
return luigi.LocalTarget(
"{path}/{fn}_{date}.csv".format(
path=self.toS3_path,
fn=self.fn,
date=str(self.date)
)
)
def run(self):
pass
# do stuff
self.output().open('w').close() # create blank output file
if __name__ == "__main__":
config = luigi.configuration.get_config()
tmp_path = config.get('path', 'tmp_path')
if os.path.exists(tmp_path): # I keep a tmp directory that is cleared prior to execution.
shutil.rmtree(tmp_path)
os.makedirs(tmp_path)
luigi.run(['ExampleTask', '--workers', '1'])
[core]
default-scheduler-port: 8082
error-email: <your_email>@<domain>.com
email-prefix: [LUIGI]
email-sender: <email which you want to send the email>
email-type: plain
max-reschedules: 1
smtp_host: smtp.gmail.com
smtp_login: <same as email-sender>
smtp_password: <email sender password>
smtp_port: 587
[path]
tmp_path: /Users/<user>/Desktop/outputs/tmp
toS3_path: /Users/<user>/Desktop/outputs/tos3
s3_load_bucket: s3://name.of.your.desired.bucket/
s3_complete_bucket: s3://name.of.your.complete.bucket/
[redshift]
host: <cluster_name>.<dns_to_your_redshift_cluster>:<port_num>
database: <db_name>
user: <db_user>
password: <db_password>
[s3]
aws_access_key_id: <key>
aws_secret_access_key: <secret>
calling_format: boto.s3.connection.OrdinaryCallingFormat
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment