Skip to content

Instantly share code, notes, and snippets.

@neil90
Created October 3, 2015 00:14
Show Gist options
  • Save neil90/17cd2aeffc990a07e11f to your computer and use it in GitHub Desktop.
Save neil90/17cd2aeffc990a07e11f to your computer and use it in GitHub Desktop.
hadoop fs -rmr ./AAA_AnnualReview
hadoop fs -rmr ./AAA_Employee
hadoop fs -rmr ./tbl_Underwriting_Extract
sqoop import --driver com.microsoft.sqlserver.jdbc.SQLServerDriver --connect 'jdbc:sqlserver://sa3sql500:51433;database=BigData' --table AAA_AnnualReview --username BigData_ETL_User --password BigData_ETL_User -m 1
sqoop import --driver com.microsoft.sqlserver.jdbc.SQLServerDriver --connect 'jdbc:sqlserver://sa3sql500:51433;database=BigData' --table AAA_Employee --username BigData_ETL_User --password BigData_ETL_User -m 1
sqoop import --driver com.microsoft.sqlserver.jdbc.SQLServerDriver --connect 'jdbc:sqlserver://sa3sql500:51433;database=BigData' --table tbl_Underwriting_Extract --username BigData_ETL_User --password BigData_ETL_User -m 1
import multiprocessing # the module we will be using for multiprocessing
import time
import subprocess
import logging
import sys
start = time.time()
def get_cmd(fn):
"""
Parses bash file for commands
"""
shfile = open(fn)
#contains only command lines from bash, removes new lines
clean_lines = [lines for lines in shfile.readlines() if lines.strip()]
#get remove hdfs lines
hdfsrmr = [rmr for rmr in clean_lines if 'hadoop fs -rmr' in rmr]
#gets sqoop lines
sqooper = [sq for sq in clean_lines if sq.startswith('sqoop') ]
return hdfsrmr, sqooper
def work(cmd):
"""
Multiprocessing work
"""
#print 'Running command\n%s\n...' % cmd
subprocess.call("%s" % cmd,shell = True)
def main():
#bash file
batch_script = 'sample_sqoop.sh'
#Computer has 24 cores spins 24 threads
#doesn't matter if I use max all work is being done on yarn
number_processes = 24
pool = multiprocessing.Pool(number_processes)
logging.info('Total number of threads being spun up %s.' % number_processes)
try :
hadooprmr, sqooplines = get_cmd(batch_script)
logging.info('Total commands being run %s' % (len(hadooprmr) + len(sqooplines)))
except Exception,e:
logging.exception('Could not find batch script')
#print 'Could not find file ending script'
sys.exit(0)
#HDFS files have to be removed first
logging.info('Removing hdfs files...')
pool.map(work, (i for i in hadooprmr))
#running sqoop jobs
logging.info('Sqooping...')
pool.map(work, (j for j in sqooplines))
pool.close()
pool.join()
logging.info('Total script time %s' % (time.time()-start) )
#print 'It took', time.time()-start,'seconds.'
if __name__ == "__main__": # Allows for the safe importing of the main module
logging.basicConfig(filename = 'sqoop_ERS.log', level = logging.INFO,
format = '%(asctime)s - %(levelname)s - %(message)s')
main()
@neil90
Copy link
Author

neil90 commented Oct 3, 2015

What sqoop_ERS.log file looks like -
2015-10-02 13:39:39,729 - INFO - Total number of threads being spun up 24.
2015-10-02 13:39:39,736 - INFO - Total commands being run 6
2015-10-02 13:39:39,736 - INFO - Removing hdfs files...
2015-10-02 13:39:42,170 - INFO - Sqooping...
2015-10-02 13:40:07,251 - INFO - Total script time 27.5447700024

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment