Skip to content

Instantly share code, notes, and snippets.

@vagelim
Last active February 25, 2016 17:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vagelim/1dc98c906ce2779f7118 to your computer and use it in GitHub Desktop.
Save vagelim/1dc98c906ce2779f7118 to your computer and use it in GitHub Desktop.
Luigi job to copy S3 data to Redshift
import luigi
from luigi import configuration
import datetime
from luigi.contrib import redshift
from mortar.luigi import mortartask
from dd_tasks import DatadogPigscriptTask
from luigi.s3 import S3Target, S3PathTask
from mortar.luigi.s3transfer import S3ToLocalTask
import dd_utils
#from redshift_utils import CopyToRedshiftTask
import logging
logger = logging.getLogger('luigi-interface')
now = datetime.datetime.now()
filename = 'clean/' + now.strftime("%Y-%m-%d") + '/part-m-00000'
S3 = 's3://seranking/' + filename
OUT = 's3://seranking/copyjob'
class CopyToRedShift(redshift.S3CopyToTable):
output_base_path = OUT
columns = [
('keyword', 'text'),
('search_volume', 'int'),
('ranking', 'int'),
('tags', 'text')
]
column_separator = '\t'
env = luigi.Parameter(default='prod')
version = luigi.Parameter(default='1.0')
cluster_size = luigi.IntParameter(default=0)
def redshift_credentials(self):
config = configuration.get_config()
section = 'redshift'
return {
'host' : config.get(section, 'prod_host'),
'port' : config.get(section, 'port'),
'database' : config.get(section, 'prod_database'),
'username' : config.get(section, 'prod_username'),
'password' : config.get(section, 'prod_password'),
'aws_access_key_id' : config.get(section, 'aws_access_key_id'),
'aws_secret_access_key' : config.get(section, 'aws_secret_access_key')
}
def s3_load_path(self):
return S3
@property
def aws_access_key_id(self):
return self.redshift_credentials()['aws_access_key_id']
@property
def aws_secret_access_key(self):
return self.redshift_credentials()['aws_secret_access_key']
@property
def database(self):
return self.redshift_credentials()['database']
@property
def user(self):
return self.redshift_credentials()['username']
@property
def password(self):
return self.redshift_credentials()['password']
@property
def host(self):
return self.redshift_credentials()['host'] + ':' + self.redshift_credentials()['port']
@property
def table(self):
return 'seranking_test'
@property
def copy_options(self):
return 'IGNOREHEADER 0'
if __name__ == "__main__":
luigi.run(main_task_cls=CopyToRedShift)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment