Skip to content

Instantly share code, notes, and snippets.

@sofianhw
Forked from dlstadther/example_etl.py
Created July 30, 2017 11:01
Show Gist options
  • Save sofianhw/3b6b7a2d114b60d11c39a10f29946f0b to your computer and use it in GitHub Desktop.
Save sofianhw/3b6b7a2d114b60d11c39a10f29946f0b to your computer and use it in GitHub Desktop.
Example ETL Using Luigi
# import python core modules
import os
import shutil
import datetime
# import external modules
import pandas as pd
import requests
# import luigi modules
import luigi
from luigi.contrib import redshift
from luigi import configuration, s3
# meta data
__author__ = 'Dillon Stadther'
__date__ = '2017-03-21'
class redshift(luigi.Config):
"""Utilizes Luigi's config system to get config variables within the <class_name> tag from luigi.cfg."""
host = luigi.Parameter(default='')
database = luigi.Parameter(default='')
user = luigi.Parameter(default='')
password = luigi.Parameter(default='')
class s3(luigi.Config):
"""Utilizes Luigi's config system to get config variables within the <class_name> tag from luigi.cfg."""
aws_access_key_id = luigi.Parameter(default='')
aws_secret_access_key = luigi.Parameter(default='')
class ExampleTask(luigi.Task):
# overkill - if run on a machine using UTC time, then use:
# date = luigi.DateParameter(default=datetime.date.today())
date = luigi.DateParameter(default=datetime.datetime.utcnow())
tmp_path = luigi.Parameter(config_path=dict(section='path', name='tmp_path'))
def requires(self):
"""
Anything returned or yielded by requires must have a 'true' complete() method (aka successful output) before
this class's run method will execute.
"""
jobs = [
{
'table': 'my_table',
'fn': 'my_file'
},
{
'table': 'your_table',
'fn': 'your_file'
}
]
for job in jobs:
yield RedshiftCopy(
date=self.date,
table=job['table'],
fn=job['fn']
)
def output(self):
return luigi.LocalTarget("%s/example_task_%s" % (self.tmp_path, str(self.date)))
def run(self):
self.output().open('w').close()
class RedshiftCopy(redshift.S3CopyToTable):
"""Implementation of redshift.S3CopyToTable"""
date = luigi.DateParameter()
fn = luigi.Parameter()
# there are a lot more options available here (i.e. temp table copy, prune based on date or column, etc.)
table_type = luigi.Parameter(default='')
table = luigi.Parameter()
queries = luigi.ListParameter(default=[])
copy_options = "CSV IGNOREHEADER 1 BLANKSASNULL EMPTYASNULL TIMEFORMAT 'auto' DATEFORMAT 'auto'"
host = redshift().host
database = redshift().database
user = redshift().user
password = redshift().password
aws_access_key_id = s3().aws_access_key_id
aws_secret_access_key = s3().aws_secret_access_key
def s3_load_path(self):
# optional usages (for the variations noted on L98-L100)
# return self.input().path
# return self.input()['s3'].path
return self.input()[0].path
def requires(self):
# optional variations are:
# return ToS3(...)
# return {'s3': ToS3(...)}
return [
ToS3(date=self.date, fn=self.fn)
]
class ToS3(luigi.Task):
"""Utilization of luigi.s3. Pushes single input file to designated s3 bucket."""
date = luigi.DateParameter()
fn = luigi.Parameter()
s3_load_bucket = luigi.Parameter(config_path=dict(section='path', name='s3_load_bucket'))
tos3_path = luigi.Parameter(config_path=dict(section='path', name='tos3_path'))
client = s3.S3Client() # you do not have to specify parameters here b/c the S3Client() looks in your luigi.cfg
def get_fn(self):
return self.input()[0].path.split('/')[-1]
def requires(self):
return [
ExampleBaseTask(date=self.date, fn=self.fn)
]
def output(self):
return s3.S3Target("%s%s" % (self.s3_load_bucket, self.get_fn()), client=self.client)
def run(self):
self.client.put("%s/%s" % (self.tos3_path, self.get_fn()), "%s%s" % (self.s3_load_bucket, self.get_fn()))
class ExampleBaseTask(luigi.Task):
"""Main task to do whatever. Sometimes this may also requies other things"""
date = luigi.DateParameter()
fn = luigi.Parameter()
tos3_path = luigi.Parameter(config_path=dict(section='path', name='tos3_path'))
# since this task doesn't 'require' anything, that method can be defaulted to luigi.Task.require()
def output(self):
return luigi.LocalTarget(
"{path}/{fn}_{date}.csv".format(
path=self.toS3_path,
fn=self.fn,
date=str(self.date)
)
)
def run(self):
response = requests.get('http://whatever.com/api/endpoint')
data = # somehow you got response.content into a list of lists (aka, csv format) and you happen to know the header
# convert to dataframe
df = pd.DataFrame(data, columns=['col1','col2','col3', 'col4'])
# you can now manipulate the data easily within the dataframe
# one easy thing is reordering desired columns
df = df[['col2', 'col4', 'col1']]
# write dataframe to output csv
df.to_csv(self.output().path, index=False, encoding='utf-8')
if __name__ == "__main__":
config = luigi.configuration.get_config()
tmp_path = config.get('path', 'tmp_path')
# this needs to be rewritten into a more Pythonic try/except fashion
if os.path.exists(tmp_path): # I keep a tmp directory that is cleared prior to execution.
shutil.rmtree(tmp_path)
os.makedirs(tmp_path)
luigi.run(['ExampleTask', '--workers', '1'])
[core]
default-scheduler-port: 8082
error-email: <your_email>@<domain>.com
email-prefix: [LUIGI]
email-sender: <email which you want to send the email>
email-type: plain
max-reschedules: 1
smtp_host: smtp.gmail.com
smtp_login: <same as email-sender>
smtp_password: <email sender password>
smtp_port: 587
[path]
tmp_path: /Users/<user>/Desktop/outputs/tmp
tos3_path: /Users/<user>/Desktop/outputs/tos3
s3_load_bucket: s3://name.of.your.desired.bucket/
s3_complete_bucket: s3://name.of.your.complete.bucket/
[redshift]
host: <cluster_name>.<dns_to_your_redshift_cluster>:<port_num>
database: <db_name>
user: <db_user>
password: <db_password>
[s3]
aws_access_key_id: <key>
aws_secret_access_key: <secret>
calling_format: boto.s3.connection.OrdinaryCallingFormat
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment