Skip to content

Instantly share code, notes, and snippets.

@dlstadther
Last active January 30, 2023 14:47
Show Gist options
  • Save dlstadther/eec24fe797d7c86ec1a19389a0ce9d7c to your computer and use it in GitHub Desktop.
Save dlstadther/eec24fe797d7c86ec1a19389a0ce9d7c to your computer and use it in GitHub Desktop.
Example ETL Using Luigi
# import python core modules
import datetime
import logging
# import external modules
import pandas as pd
import requests
# import luigi modules
import luigi
from luigi.contrib import redshift, s3
logger = logging.getLogger('luigi-interface')
# meta data
__author__ = 'Dillon Stadther'
__date__ = '2018-01-30'
DATE_FORMAT = '%Y-%m-%d'
DATEHOUR_FORMAT = '%Y-%m-%dT%H'
DATEMINUTE_FORMAT = '%Y-%m-%dT%H%M'
class path(luigi.Config):
tos3 = luigi.Parameter()
s3_load_bucket = luigi.Parameter()
class redshift(luigi.Config):
"""Utilizes Luigi's config system to get config variables within the <class_name> tag from luigi.cfg."""
host = luigi.Parameter(default='')
database = luigi.Parameter(default='')
user = luigi.Parameter(default='')
password = luigi.Parameter(default='')
class s3(luigi.Config):
"""Utilizes Luigi's config system to get config variables within the <class_name> tag from luigi.cfg."""
aws_access_key_id = luigi.Parameter(default='')
aws_secret_access_key = luigi.Parameter(default='')
class ExampleTask(luigi.WrapperTask):
date = luigi.DateHourParameter(default=datetime.datetime.utcnow())
def requires(self):
"""
Anything returned or yielded by requires must have a 'true' complete() method (aka successful output) before
this class's run method will execute.
"""
jobs = [
{
'table': 'my_table',
'fn': 'my_file'
},
{
'table': 'your_table',
'fn': 'your_file'
}
]
for job in jobs:
yield RedshiftCopy(
date=self.date,
table=job['table'],
fn=job['fn']
)
class RedshiftCopy(redshift.S3CopyToTable):
"""Implementation of redshift.S3CopyToTable"""
date = luigi.DateParameter()
fn = luigi.Parameter()
# there are a lot more options available here (i.e. temp table copy, prune based on date or column, etc.)
table_type = luigi.Parameter(default='')
table = luigi.Parameter()
queries = luigi.ListParameter(default=[])
copy_options = "CSV IGNOREHEADER 1 BLANKSASNULL EMPTYASNULL TIMEFORMAT 'auto' DATEFORMAT 'auto'"
host = redshift().host
database = redshift().database
user = redshift().user
password = redshift().password
aws_access_key_id = s3().aws_access_key_id
aws_secret_access_key = s3().aws_secret_access_key
def s3_load_path(self):
# optional usages (for the variations noted on L98-L100)
# return self.input().path
# return self.input()['s3'].path
return self.input()[0].path
def requires(self):
# optional variations are:
# return ToS3(...)
# return {'s3': ToS3(...)}
return [
ToS3(date=self.date, fn=self.fn)
]
class ToS3(luigi.Task):
"""Utilization of luigi.s3. Pushes single input file to designated s3 bucket."""
date = luigi.DateParameter()
fn = luigi.Parameter()
client = s3.S3Client() # you do not have to specify parameters here b/c the S3Client() looks in your luigi.cfg
@property
def fn_src(self):
return self.input()[0].path.split('/')[-1]
def requires(self):
return [
ExampleBaseTask(date=self.date, fn=self.fn)
]
def output(self):
return s3.S3Target("{}{}".format(path().s3_load_bucket, self.src_fn), client=self.client)
def run(self):
logger.info('Uploading {} to {}'.format(self.input()[0].path, self.output().path))
self.client.put(self.input()[0].path, self.output().path)
class ExampleBaseTask(luigi.Task):
"""Main task to do whatever. Sometimes this may also requies other things"""
date = luigi.DateParameter()
fn = luigi.Parameter()
# since this task doesn't 'require' anything, that method can be defaulted to luigi.Task.require()
def output(self):
return luigi.LocalTarget(
"{path}/{fn}_{date}.csv".format(
path=path().tos3,
fn=self.fn,
date=self.date.strftime(DATE_FORMAT)
)
)
def run(self):
response = requests.get('http://whatever.com/api/endpoint')
data = # somehow you got response.content into a list of lists (aka, csv format) and you happen to know the header
# convert to dataframe
df = pd.DataFrame(data, columns=['col1','col2','col3', 'col4'])
# you can now manipulate the data easily within the dataframe
# one easy thing is reordering desired columns
df = df[['col2', 'col4', 'col1']]
# write dataframe to output csv
df.to_csv(self.output().path, index=False, encoding='utf-8')
if __name__ == "__main__":
luigi.run() # run via CLI with non-default var `python example_etl.py ExampleTask --date '2018-01-01'`
[DEFAULT]
user_path: /home/my-user
[loggers]
keys=root, luigi-interface
[formatters]
keys=standardFormatter, consoleFormatter
[handlers]
keys=root, luigiHandler, consoleHandler
[logger_root]
level=ERROR
handlers=root
[logger_luigi-interface]
level=DEBUG
handlers=consoleHandler,luigiHandler
qualname=luigi-interface
propagate=0
[formatter_standardFormatter]
format=%(asctime)s.%(msecs)03d %(name)-12s %(levelname)-8s %(message)s
datefmt=%y-%m-%d %H:%M:%S
[formatter_consoleFormatter]
format=%(levelname)s - %(message)s
datefmt=
[handler_root]
level=WARNING
class=handlers.TimedRotatingFileHandler
formatter=standardFormatter
args=("%(user_path)s/outputs/log/luigi-root.log","midnight",1,14)
[handler_luigiHandler]
class=handlers.TimedRotatingFileHandler
formatter=standardFormatter
args=("%(user_path)s/outputs/log/luigi.log","midnight",1,14)
[handler_consoleHandler]
class=StreamHandler
level=WARNING
formatter=consoleFormatter
args=(sys.stdout,)
[DEFAULT]
user_path: /home/my-user
[core]
default-scheduler-port: 8082
error-email: <your_email>@<domain>.com
email-prefix: [LUIGI]
email-sender: <email which you want to send the email>
email-type: plain
max-reschedules: 1
smtp_host: smtp.gmail.com
smtp_login: <same as email-sender>
smtp_password: <email sender password>
smtp_port: 587
logging_conf_file: %(user_path)s/this-etl/logging.conf
[path]
tmp_path: %(user_path)s/outputs/tmp
tos3_path: %(user_path)s/outputs/tos3
s3_load_bucket: s3://name.of.your.desired.bucket/
s3_complete_bucket: s3://name.of.your.complete.bucket/
[redshift]
host: <cluster_name>.<dns_to_your_redshift_cluster>:<port_num>
database: <db_name>
user: <db_user>
password: <db_password>
[s3]
aws_access_key_id: <key>
aws_secret_access_key: <secret>
calling_format: boto.s3.connection.OrdinaryCallingFormat
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment