-
-
Save sofianhw/3b6b7a2d114b60d11c39a10f29946f0b to your computer and use it in GitHub Desktop.
Example ETL Using Luigi
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# import python core modules | |
import os | |
import shutil | |
import datetime | |
# import external modules | |
import pandas as pd | |
import requests | |
# import luigi modules | |
import luigi | |
from luigi.contrib import redshift | |
from luigi import configuration, s3 | |
# meta data | |
__author__ = 'Dillon Stadther' | |
__date__ = '2017-03-21' | |
class redshift(luigi.Config): | |
"""Utilizes Luigi's config system to get config variables within the <class_name> tag from luigi.cfg.""" | |
host = luigi.Parameter(default='') | |
database = luigi.Parameter(default='') | |
user = luigi.Parameter(default='') | |
password = luigi.Parameter(default='') | |
class s3(luigi.Config): | |
"""Utilizes Luigi's config system to get config variables within the <class_name> tag from luigi.cfg.""" | |
aws_access_key_id = luigi.Parameter(default='') | |
aws_secret_access_key = luigi.Parameter(default='') | |
class ExampleTask(luigi.Task): | |
# overkill - if run on a machine using UTC time, then use: | |
# date = luigi.DateParameter(default=datetime.date.today()) | |
date = luigi.DateParameter(default=datetime.datetime.utcnow()) | |
tmp_path = luigi.Parameter(config_path=dict(section='path', name='tmp_path')) | |
def requires(self): | |
""" | |
Anything returned or yielded by requires must have a 'true' complete() method (aka successful output) before | |
this class's run method will execute. | |
""" | |
jobs = [ | |
{ | |
'table': 'my_table', | |
'fn': 'my_file' | |
}, | |
{ | |
'table': 'your_table', | |
'fn': 'your_file' | |
} | |
] | |
for job in jobs: | |
yield RedshiftCopy( | |
date=self.date, | |
table=job['table'], | |
fn=job['fn'] | |
) | |
def output(self): | |
return luigi.LocalTarget("%s/example_task_%s" % (self.tmp_path, str(self.date))) | |
def run(self): | |
self.output().open('w').close() | |
class RedshiftCopy(redshift.S3CopyToTable): | |
"""Implementation of redshift.S3CopyToTable""" | |
date = luigi.DateParameter() | |
fn = luigi.Parameter() | |
# there are a lot more options available here (i.e. temp table copy, prune based on date or column, etc.) | |
table_type = luigi.Parameter(default='') | |
table = luigi.Parameter() | |
queries = luigi.ListParameter(default=[]) | |
copy_options = "CSV IGNOREHEADER 1 BLANKSASNULL EMPTYASNULL TIMEFORMAT 'auto' DATEFORMAT 'auto'" | |
host = redshift().host | |
database = redshift().database | |
user = redshift().user | |
password = redshift().password | |
aws_access_key_id = s3().aws_access_key_id | |
aws_secret_access_key = s3().aws_secret_access_key | |
def s3_load_path(self): | |
# optional usages (for the variations noted on L98-L100) | |
# return self.input().path | |
# return self.input()['s3'].path | |
return self.input()[0].path | |
def requires(self): | |
# optional variations are: | |
# return ToS3(...) | |
# return {'s3': ToS3(...)} | |
return [ | |
ToS3(date=self.date, fn=self.fn) | |
] | |
class ToS3(luigi.Task): | |
"""Utilization of luigi.s3. Pushes single input file to designated s3 bucket.""" | |
date = luigi.DateParameter() | |
fn = luigi.Parameter() | |
s3_load_bucket = luigi.Parameter(config_path=dict(section='path', name='s3_load_bucket')) | |
tos3_path = luigi.Parameter(config_path=dict(section='path', name='tos3_path')) | |
client = s3.S3Client() # you do not have to specify parameters here b/c the S3Client() looks in your luigi.cfg | |
def get_fn(self): | |
return self.input()[0].path.split('/')[-1] | |
def requires(self): | |
return [ | |
ExampleBaseTask(date=self.date, fn=self.fn) | |
] | |
def output(self): | |
return s3.S3Target("%s%s" % (self.s3_load_bucket, self.get_fn()), client=self.client) | |
def run(self): | |
self.client.put("%s/%s" % (self.tos3_path, self.get_fn()), "%s%s" % (self.s3_load_bucket, self.get_fn())) | |
class ExampleBaseTask(luigi.Task): | |
"""Main task to do whatever. Sometimes this may also requies other things""" | |
date = luigi.DateParameter() | |
fn = luigi.Parameter() | |
tos3_path = luigi.Parameter(config_path=dict(section='path', name='tos3_path')) | |
# since this task doesn't 'require' anything, that method can be defaulted to luigi.Task.require() | |
def output(self): | |
return luigi.LocalTarget( | |
"{path}/{fn}_{date}.csv".format( | |
path=self.toS3_path, | |
fn=self.fn, | |
date=str(self.date) | |
) | |
) | |
def run(self): | |
response = requests.get('http://whatever.com/api/endpoint') | |
data = # somehow you got response.content into a list of lists (aka, csv format) and you happen to know the header | |
# convert to dataframe | |
df = pd.DataFrame(data, columns=['col1','col2','col3', 'col4']) | |
# you can now manipulate the data easily within the dataframe | |
# one easy thing is reordering desired columns | |
df = df[['col2', 'col4', 'col1']] | |
# write dataframe to output csv | |
df.to_csv(self.output().path, index=False, encoding='utf-8') | |
if __name__ == "__main__": | |
config = luigi.configuration.get_config() | |
tmp_path = config.get('path', 'tmp_path') | |
# this needs to be rewritten into a more Pythonic try/except fashion | |
if os.path.exists(tmp_path): # I keep a tmp directory that is cleared prior to execution. | |
shutil.rmtree(tmp_path) | |
os.makedirs(tmp_path) | |
luigi.run(['ExampleTask', '--workers', '1']) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[core] | |
default-scheduler-port: 8082 | |
error-email: <your_email>@<domain>.com | |
email-prefix: [LUIGI] | |
email-sender: <email which you want to send the email> | |
email-type: plain | |
max-reschedules: 1 | |
smtp_host: smtp.gmail.com | |
smtp_login: <same as email-sender> | |
smtp_password: <email sender password> | |
smtp_port: 587 | |
[path] | |
tmp_path: /Users/<user>/Desktop/outputs/tmp | |
tos3_path: /Users/<user>/Desktop/outputs/tos3 | |
s3_load_bucket: s3://name.of.your.desired.bucket/ | |
s3_complete_bucket: s3://name.of.your.complete.bucket/ | |
[redshift] | |
host: <cluster_name>.<dns_to_your_redshift_cluster>:<port_num> | |
database: <db_name> | |
user: <db_user> | |
password: <db_password> | |
[s3] | |
aws_access_key_id: <key> | |
aws_secret_access_key: <secret> | |
calling_format: boto.s3.connection.OrdinaryCallingFormat |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment