[PanSTARRS DR1 Downloader] Download the StackObjectThin and StackObjectAttributes Tables from MAST #panstarrs #dr1 #casjobs #sherlock
#!/usr/local/bin/python | |
# encoding: utf-8 | |
""" | |
*Download the PanSTARRS DR1 Catalogue from MAST via CasJobs into FITS files containing 500,000 rows each* | |
:Author: | |
David Young | |
:Date Created: | |
June 18, 2018 | |
Usage: | |
panstarrs_dr1_downloader | |
Options: | |
-h, --help show this help message | |
""" | |
################# GLOBAL IMPORTS #################### | |
import sys | |
import os | |
from fundamentals import tools | |
import yaml | |
import urllib | |
from subprocess32 import Popen, PIPE, STDOUT, TimeoutExpired | |
import uuid as pyuuid | |
import re | |
import requests | |
import shutil | |
from fundamentals.download import multiobject_download | |
import time | |
from datetime import datetime, date | |
from dateutil.relativedelta import relativedelta | |
import uuid as pyuuid | |
querySize = 2500000 | |
def main(arguments=None): | |
""" | |
*The main function used when ``panstarrs_dr1_downloader.py`` is run as a single script from the cl* | |
""" | |
# SETUP THE COMMAND-LINE UTIL SETTINGS | |
su = tools( | |
arguments=arguments, | |
docString=__doc__, | |
logLevel="INFO", | |
options_first=False, | |
projectName=False | |
) | |
arguments, settings, log, dbConn = su.setup() | |
# UNPACK REMAINING CL ARGUMENTS USING `EXEC` TO SETUP THE VARIABLE NAMES | |
# AUTOMATICALLY | |
for arg, val in arguments.iteritems(): | |
if arg[0] == "-": | |
varname = arg.replace("-", "") + "Flag" | |
else: | |
varname = arg.replace("<", "").replace(">", "") | |
if isinstance(val, str) or isinstance(val, unicode): | |
exec(varname + " = '%s'" % (val,)) | |
else: | |
exec(varname + " = %s" % (val,)) | |
if arg == "--dbConn": | |
dbConn = val | |
log.debug('%s = %s' % (varname, val,)) | |
# GENERATE BOOKKEEPING YAML DATABASE | |
bkfileName = "bookkeeper.yaml" | |
exists = os.path.exists(bkfileName) | |
if not exists: | |
bookkeeper = {} | |
joblist = ["one", "two", "three", "four", | |
"five", "six", "seven", "eight"] | |
bookkeeper["jobs"] = {} | |
for j in joblist: | |
bookkeeper["jobs"][j] = { | |
"max_objid": 0, | |
"thin_job_id": 0, | |
"attr_job_id": None, | |
"status": None | |
} | |
bookkeeper["master_max_objid"] = 0 | |
bookkeeper["rows_downloaded"] = 0 | |
stream = file(bkfileName, 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
# OPEN BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
# LOOP THROUGH OVER QUERIES AND DOWNLOADS UNTIL HIGHEST OBJID IS REACHED | |
while bookkeeper["master_max_objid"] < 215993524933671565: | |
# OPEN BOOKKEEPER | |
log.info('RESTARTING THE LOOP' % locals()) | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
log.info('1. starting new queries' % locals()) | |
start_new_queries(log) | |
log.info('2. extracting query results from MyDB' % locals()) | |
extract_results(log) | |
log.info('3. dropping the MyDB tables' % locals()) | |
drop_tables(log) | |
log.info('4. downloading the output fits files' % locals()) | |
download_results(log) | |
log.info('5. restarting failed jobs' % locals()) | |
restart_failed_jobs(log) | |
log.info('6. killing timed out jobs' % locals()) | |
timeout_long_running_jobs(log) | |
# INCREMENTAL BOOKKEEPER BACKUP | |
now = datetime.now() | |
now = now.strftime("%Y%m%dt%H") | |
shutil.copyfile('bookkeeper.yaml', | |
'bookkeeper_%(now)s.yaml' % locals()) | |
return | |
def current_jobs( | |
log): | |
"""*check casjobs for any currently running jobs and return job IDs* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- ``jobs`` -- list of currently running job IDs | |
""" | |
log.debug('starting the ``current_jobs`` function') | |
cmd = """java -jar /usr/local/bin/casjobs.jar jobs -status running""" % locals() | |
p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
log.debug('output: %(stdout)s' % locals()) | |
if "no jobs found" not in stdout.lower(): | |
jobs = [] | |
regex = re.compile(r'JobID: (\d+)') | |
regexIterator = regex.finditer(stdout) | |
for match in regexIterator: | |
jobs.append(int(match.group(1))) | |
else: | |
jobs = [] | |
log.debug('completed the ``current_jobs`` function') | |
return jobs | |
def start_new_queries( | |
log): | |
"""*Find empty jobs and start new queries* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- None | |
""" | |
log.debug('starting the ``start_new_queries`` function') | |
global querySize | |
# READ BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
# NUMBER OF ROWS PER FILE | |
numberRows = querySize | |
# ITERATE OVER 8 JOBS AND TRIGGER NEW QUERIES IF THERE IS SPACE AVAILABLE | |
# (i.e STATUS IS COMPLETE) | |
for k, v in bookkeeper["jobs"].iteritems(): | |
if not v["status"] or v["status"] == "complete": | |
# USE THE OLD MAX OBJID FOR QUERY LOWER LIMIT - COLLECT NEW MAX ID | |
# 500,000 ROWS ON | |
oldMaxId = int(bookkeeper["master_max_objid"]) | |
newMaxId = None | |
while newMaxId == None: | |
newMaxId = get_next_maxid(log) | |
# NEW STACK ATTR QUERY TO COLLECT 500,000 ROWS | |
uuid = "a" + newMaxId | |
query = """SELECT top %(numberRows)s into myDB.%(uuid)s objID, uniquePspsSTid, gpsfLikelihood, gmomentR1, gmomentRH, gKronRad, gExtNSigma, rpsfLikelihood, rmomentR1, rmomentRH, rKronRad, rExtNSigma, ipsfLikelihood, imomentR1, imomentRH, iKronRad, iExtNSigma, zpsfLikelihood, zmomentR1, zmomentRH, zKronRad, zExtNSigma, ypsfLikelihood, ymomentR1, ymomentRH, yKronRad, yExtNSigma from StackObjectAttributes WHERE primaryDetection = 1 and objid > %(oldMaxId)s order by objid""" % locals() | |
jobId = submit_query(log=log, query=query, uuid=uuid) | |
bookkeeper["jobs"][k]["attr_job_id"] = int(jobId) | |
time.sleep(1) | |
# NEW STACK THIN QUERY TO COLLECT 500,000 ROWS | |
uuid = "t" + newMaxId | |
query = """SELECT top %(numberRows)s into myDB.%(uuid)s objID, uniquePspsSTid, primaryDetection, bestDetection, gra, gdec, gPSFMag, gPSFMagErr, gApMag, gApMagErr, gKronMag, gKronMagErr, ginfoFlag, ginfoFlag2, ginfoFlag3, rra, rdec, rPSFMag, rPSFMagErr, rApMag, rApMagErr, rKronMag, rKronMagErr, rinfoFlag, rinfoFlag2, rinfoFlag3, ira, idec, iPSFMag, iPSFMagErr, iApMag, iApMagErr, iKronMag, iKronMagErr, iinfoFlag, iinfoFlag2, iinfoFlag3, zra, zdec, zPSFMag, zPSFMagErr, zApMag, zApMagErr, zKronMag, zKronMagErr, zinfoFlag, zinfoFlag2, zinfoFlag3, yra, ydec, yPSFMag, yPSFMagErr, yApMag, yApMagErr, yKronMag, yKronMagErr, yinfoFlag, yinfoFlag2, yinfoFlag3 from StackObjectThin WHERE primaryDetection = 1 and objid > %(oldMaxId)s order by objid""" % locals() | |
jobId = submit_query(log=log, query=query, uuid=uuid) | |
bookkeeper["jobs"][k]["thin_job_id"] = int(jobId) | |
bookkeeper["jobs"][k]["status"] = "running" | |
bookkeeper["jobs"][k]["max_objid"] = int(newMaxId) | |
bookkeeper["jobs"][k]["min_objid"] = oldMaxId | |
bookkeeper["master_max_objid"] = int(newMaxId) | |
# UPDATE BOOKKEEPING FILE WITH THE QUERY JOB IDS | |
stream = file('bookkeeper.yaml', 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
log.debug('completed the ``start_new_queries`` function') | |
return None | |
def extract_results( | |
log): | |
"""*check on running jobs and request to extract the results for those jobs that have finished* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- None | |
""" | |
log.debug('starting the ``extract_results`` function') | |
# READ BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
# ITERATE THORUGH JOBS - IF RUNNING CHECK IF JOBS IS COMPLETE. | |
for k, v in bookkeeper["jobs"].iteritems(): | |
if v["status"] == "running": | |
for thisId in [v["attr_job_id"], v["thin_job_id"]]: | |
extract = True | |
status = query_job_status(log=log, jobId=thisId) | |
if status == "failed" or status == None: | |
extract = False | |
bookkeeper["jobs"][k]["status"] = "failed" | |
if status != "complete": | |
extract = False | |
break | |
# IF BOTH THIN AND ATTR QUERIES COMPLETE THEN TRIGGER EXTRACT TO | |
# FITS | |
if extract: | |
# BOTH JOBS COMPLETE - EXTRACT | |
maxId = int(v["max_objid"]) | |
query = "java -jar /usr/local/bin/casjobs.jar extract -a FITS -b a%(maxId)s" % locals( | |
) | |
os.system(query) | |
query = "java -jar /usr/local/bin/casjobs.jar extract -a FITS -b t%(maxId)s" % locals( | |
) | |
os.system(query) | |
bookkeeper["jobs"][k]["status"] = "extracting" | |
# UPDATE BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
log.debug('completed the ``extract_results`` function') | |
return None | |
def download_results( | |
log): | |
"""*download the FITS files for tables that have been extracted and dropped* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- None | |
""" | |
log.debug('starting the ``download_results`` function') | |
global querySize | |
# READ BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
downloads = [] | |
filenames = [] | |
jobNumbers = [] | |
found = None | |
for k, v in bookkeeper["jobs"].iteritems(): | |
if v["status"] == "dropping" and not found: | |
maxId = v["max_objid"] | |
urls = ["a", "t"] | |
downloaded = True | |
for u in urls: | |
if u == 't': | |
thisId = v["thin_job_id"] | |
else: | |
thisId = v["attr_job_id"] | |
status = query_job_status(log=log, jobId=thisId) | |
if status != "complete": | |
downloaded = False | |
break | |
# IF TABLE IS DROPPED EXPORT RESULTS | |
local_filename = "%(u)s%(maxId)0.18d.fits" % locals() | |
url = "http://ps1images.stsci.edu/datadelivery/outgoing/casjobs/fits/%(u)s%(maxId)s_daveyoung.fit" % locals( | |
) | |
downloads.append(url) | |
filenames.append(local_filename) | |
jobNumbers.append(k) | |
found = True | |
if len(filenames): | |
this = (" and ").join(filenames) | |
print "Downloading %(this)s" % locals() | |
localUrls = multiobject_download( | |
urlList=downloads, | |
downloadDirectory="./", | |
log=log, | |
timeStamp=False, | |
timeout=500, | |
concurrentDownloads=2, | |
resetFilename=filenames, | |
credentials=False, | |
longTime=False, | |
indexFilenames=False | |
) | |
downloaded = [] | |
for l, j in zip(localUrls, jobNumbers): | |
if l and j in downloaded: | |
bookkeeper["jobs"][j]["status"] = "complete" | |
bookkeeper["rows_downloaded"] = int( | |
bookkeeper["rows_downloaded"]) + querySize | |
elif l: | |
downloaded.append(j) | |
elif not l and j in downloaded: | |
bookkeeper["jobs"][j]["status"] = "onefailed" | |
for j in jobNumbers: | |
if j not in downloaded: | |
bookkeeper["jobs"][j]["status"] = "failed" | |
# UPDATE BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
log.debug('completed the ``download_results`` function') | |
return None | |
def drop_tables( | |
log): | |
"""*if tables are being extracted try and drop the database tables* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- None | |
""" | |
log.debug('starting the ``drop_tables`` function') | |
# READ BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
# TEST IF PREVIOUS DROPS WORKED - IF NOT TRY AGAIN | |
for k, v in bookkeeper["jobs"].iteritems(): | |
if v["status"] == "dropping": | |
maxId = v["max_objid"] | |
urls = ["a", "t"] | |
for u in urls: | |
if u == 't': | |
thisId = v["thin_job_id"] | |
else: | |
thisId = v["attr_job_id"] | |
status = query_job_status(log=log, jobId=thisId) | |
if status == None: | |
bookkeeper["jobs"][k]["status"] = "failed" | |
elif status == "failed": | |
# TEST TABLE EXISTS | |
query = """java -jar /usr/local/bin/casjobs.jar tables """ % locals( | |
) | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
if "%(u)s%(maxId)s" % locals() not in stdout: | |
bookkeeper["jobs"][k]["status"] = "failed" | |
else: | |
# SHOULD NOT BE ABLE TO DROP TABLE IF EXPORT IS | |
# ONGOING | |
query = """drop table %(u)s%(maxId)s""" % locals( | |
) | |
jobId = submit_query( | |
log=log, query=query, uuid=pyuuid.uuid1(), scheme="MyDB") | |
if u == "a": | |
bookkeeper["jobs"][k][ | |
"attr_job_id"] = int(jobId) | |
else: | |
bookkeeper["jobs"][k][ | |
"thin_job_id"] = int(jobId) | |
# UPDATE BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
# FOR TABLES THAT ARE BEING EXTRACTED TRY AND DUMP THE TABLE - WILL FAIL | |
# IF STILL EXTRACTING | |
for k, v in bookkeeper["jobs"].iteritems(): | |
if v["status"] == "extracting": | |
maxId = v["max_objid"] | |
prefix = ["a", "t"] | |
for u in prefix: | |
# SHOULD NOT BE ABLE TO DROP TABLE IF EXPORT IS ONGOING | |
query = """drop table %(u)s%(maxId)s""" % locals( | |
) | |
jobId = submit_query(log=log, query=query, | |
uuid=pyuuid.uuid1(), scheme="MyDB") | |
if u == "a": | |
bookkeeper["jobs"][k]["attr_job_id"] = int(jobId) | |
else: | |
bookkeeper["jobs"][k]["thin_job_id"] = int(jobId) | |
bookkeeper["jobs"][k]["status"] = "dropping" | |
# UPDATE BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
log.debug('completed the ``drop_tables`` function') | |
return None | |
def restart_failed_jobs( | |
log): | |
"""*try and restart failed queries* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- None | |
""" | |
log.debug('starting the ``restart_failed_jobs`` function') | |
global querySize | |
# NUMBER OF ROWS PER FILE | |
numberRows = querySize | |
# READ BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
# RESTART FAILED JOBS | |
for k, v in bookkeeper["jobs"].iteritems(): | |
if v["status"] == "failed": | |
maxId = v["max_objid"] | |
minId = int(v["min_objid"]) | |
urls = ["a", "t"] | |
# MAKE SURE BOTH JOBS ARE COMPLETED/FAILED | |
completed = True | |
for u in urls: | |
if u == 't': | |
thisId = v["thin_job_id"] | |
else: | |
thisId = v["attr_job_id"] | |
status = query_job_status(log, thisId) | |
if status and status not in ["failed", "complete"]: | |
completed = False | |
print status, status, status | |
if not completed: | |
continue | |
tableExists = True | |
while tableExists: | |
tableExists = False | |
for u in urls: | |
if u == 't': | |
thisId = v["thin_job_id"] | |
else: | |
thisId = v["attr_job_id"] | |
# TEST IF TABLE EXISTS | |
query = """java -jar /usr/local/bin/casjobs.jar tables """ % locals( | |
) | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
thisTable = "%(u)s%(maxId)s" % locals() | |
if thisTable in stdout: | |
# TRY AND DROP TABLE | |
tableExists = True | |
exists = True | |
query = """java -jar /usr/local/bin/casjobs.jar execute -t "MyDB" "drop table %(thisTable)s" """ % locals( | |
) | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
time.sleep(1) | |
# NEW STACK ATTR QUERY TO COLLECT 500,000 ROWS | |
uuid = "a" + str(maxId) | |
query = """SELECT top %(numberRows)s into myDB.%(uuid)s objID, uniquePspsSTid, gpsfLikelihood, gmomentR1, gmomentRH, gKronRad, gExtNSigma, rpsfLikelihood, rmomentR1, rmomentRH, rKronRad, rExtNSigma, ipsfLikelihood, imomentR1, imomentRH, iKronRad, iExtNSigma, zpsfLikelihood, zmomentR1, zmomentRH, zKronRad, zExtNSigma, ypsfLikelihood, ymomentR1, ymomentRH, yKronRad, yExtNSigma from StackObjectAttributes WHERE primaryDetection = 1 and objid > %(minId)s order by objid """ % locals() | |
jobId = submit_query(log=log, query=query, uuid=uuid) | |
bookkeeper["jobs"][k]["attr_job_id"] = int(jobId) | |
time.sleep(1) | |
# NEW STACK THIN QUERY TO COLLECT 500,000 ROWS | |
uuid = "t" + str(maxId) | |
query = """SELECT top %(numberRows)s into myDB.%(uuid)s objID, uniquePspsSTid, primaryDetection, bestDetection, gra, gdec, gPSFMag, gPSFMagErr, gApMag, gApMagErr, gKronMag, gKronMagErr, ginfoFlag, ginfoFlag2, ginfoFlag3, rra, rdec, rPSFMag, rPSFMagErr, rApMag, rApMagErr, rKronMag, rKronMagErr, rinfoFlag, rinfoFlag2, rinfoFlag3, ira, idec, iPSFMag, iPSFMagErr, iApMag, iApMagErr, iKronMag, iKronMagErr, iinfoFlag, iinfoFlag2, iinfoFlag3, zra, zdec, zPSFMag, zPSFMagErr, zApMag, zApMagErr, zKronMag, zKronMagErr, zinfoFlag, zinfoFlag2, zinfoFlag3, yra, ydec, yPSFMag, yPSFMagErr, yApMag, yApMagErr, yKronMag, yKronMagErr, yinfoFlag, yinfoFlag2, yinfoFlag3 from StackObjectThin WHERE primaryDetection = 1 and objid > %(minId)s order by objid""" % locals() | |
jobId = submit_query(log=log, query=query, uuid=uuid) | |
bookkeeper["jobs"][k]["thin_job_id"] = int(jobId) | |
bookkeeper["jobs"][k]["status"] = "running" | |
# UPDATE BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
log.debug('completed the ``restart_failed_jobs`` function') | |
return None | |
# use the tab-trigger below for new function | |
def timeout_long_running_jobs( | |
log): | |
"""*check for jobs that have been running/stuck for a long peroid of time and cancel them to unclog the queue* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- None | |
""" | |
log.debug('starting the ``timeout_long_running_jobs`` function') | |
# READ BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
# GRAB ALL RUNNING JOBS | |
query = """java -jar /usr/local/bin/casjobs.jar j -t started """ % locals( | |
) | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
regex = re.compile( | |
r'Jobid: (?P<jobid>\d+).*?\nStatus:\s+(?P<status>.*?)\s+submitted: (?P<datetime>.*?)\s+Completed', re.S | re.I) | |
now = datetime.utcnow() | |
thisIter = regex.finditer(stdout) | |
for item in thisIter: | |
thisId = item.group("jobid") | |
try: | |
jobStart = datetime.strptime( | |
item.group("datetime"), '%m/%d/%Y %H:%M') | |
jobStart = jobStart + relativedelta(months=1) | |
except: | |
jobStart = datetime.strptime(item.group( | |
"datetime").replace("0/", "1/", 1), '%m/%d/%Y %H:%M') | |
if now.hour > jobStart.hour: | |
jobStart = jobStart + relativedelta(hours=12) | |
jobStart = jobStart - relativedelta(hours=1) | |
diff = now - jobStart | |
diffMin = diff.days / (24 * 60) + diff.seconds / 60 | |
if diffMin > 180: | |
for k, v in bookkeeper["jobs"].iteritems(): | |
if bookkeeper["jobs"][k]["attr_job_id"] == thisId: | |
bookkeeper["jobs"][k]["status"] = "failed" | |
if bookkeeper["jobs"][k]["thin_job_id"] == thisId: | |
bookkeeper["jobs"][k]["status"] = "failed" | |
# print "open | |
# http://mastweb.stsci.edu/ps1casjobs/cancelJob.aspx?id=%(thisId)s&CancelJob=Cancel+Job" | |
# % locals() | |
# UPDATE BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
log.debug('completed the ``timeout_long_running_jobs`` function') | |
return None | |
def submit_query( | |
log, | |
query, | |
uuid, | |
scheme="PanSTARRS_DR1"): | |
"""*submit a query and return the job id* | |
**Key Arguments:** | |
- ``log`` -- logger | |
- ``query`` -- the MAST casjob query to submit | |
- ``uuid`` -- the UUID with which to name the query and resulting database table | |
- ``scheme`` -- the scheme to query | |
**Return:** | |
- ``jobID`` -- the job id from the submitted query | |
""" | |
log.debug('starting the ``submit_query`` function') | |
query = """java -jar /usr/local/bin/casjobs.jar submit -t %(scheme)s -n "%(uuid)s" "%(query)s" """ % locals( | |
) | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
log.debug('output: %(stdout)s' % locals()) | |
matchObject = re.search(r"JobID is (\d+)", stdout) | |
if matchObject: | |
jobId = matchObject.group(1) | |
else: | |
jobId = None | |
log.debug('completed the ``submit_query`` function') | |
return jobId | |
# use the tab-trigger below for new function | |
def get_next_maxid( | |
log): | |
"""*get the next maximum ID for the next batch of rows to request from MAST* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- ``maxid`` -- the maximum ID of the next batch of rows to request | |
""" | |
log.debug('starting the ``get_next_maxid`` function') | |
global querySize | |
# READ BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
# NUMBER OF ROWS PER FILE | |
numberRows = querySize | |
oldMaxId = int(bookkeeper["master_max_objid"]) | |
offset = numberRows - 1 | |
query = """SELECT objID as maxId into myDB.maxid from StackObjectAttributes WHERE primaryDetection = 1 and objid > %(oldMaxId)s order by objid offset %(offset)s rows fetch next 1 rows only""" % locals( | |
) | |
jobId = submit_query(log=log, query=query, uuid="maxid") | |
waiting = True | |
while waiting: | |
time.sleep(10) | |
status = query_job_status(log, jobId) | |
if status.lower() == "complete": | |
waiting = False | |
query = """java -jar /usr/local/bin/casjobs.jar execute -t "MyDB" "select maxId from maxid" """ | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
log.debug('output: %(stdout)s' % locals()) | |
matchObject = re.search(r"\d{5,30}", stdout) | |
if matchObject: | |
maxid = matchObject.group() | |
else: | |
maxid = None | |
elif status.lower() == "failed": | |
log.error('cound not get maxid') | |
waiting = False | |
maxid = None | |
# TEST TABLE EXISTS | |
query = """java -jar /usr/local/bin/casjobs.jar tables """ % locals( | |
) | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
if "maxid" in stdout.lower(): | |
query = """drop table maxid""" % locals( | |
) | |
jobId = submit_query( | |
log=log, query=query, uuid=pyuuid.uuid1(), scheme="MyDB") | |
waiting = True | |
while waiting: | |
time.sleep(10) | |
status = query_job_status(log, jobId) | |
if status.lower() == "complete": | |
waiting = False | |
log.debug('completed the ``get_next_maxid`` function') | |
return maxid | |
def query_job_status( | |
log, | |
jobId): | |
"""*query the status of a job* | |
**Key Arguments:** | |
- ``log`` -- logger | |
- ``jobId`` -- the casjob job id to request a status for | |
**Return:** | |
- ``status`` -- the status of the job | |
""" | |
log.debug('starting the ``query_job_status`` function') | |
status = None | |
attempt = 0 | |
while status == None and attempt != 3: | |
query = """ java -jar /usr/local/bin/casjobs.jar jobs -j %(jobId)s """ % locals( | |
) | |
log.debug("""query: `%(query)s`""" % locals()) | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
log.debug('output: %(stdout)s' % locals()) | |
matchObject = re.search(r"Status: (\S+)", stdout) | |
if matchObject: | |
status = matchObject.group(1) | |
if status == "failed" and "There is already an object" in stdout: | |
status = "complete" | |
else: | |
log.error('cound not find status of jobid %(jobId)s' % locals()) | |
attempt += 1 | |
time.sleep(1) | |
log.debug('completed the ``query_job_status`` function') | |
return status | |
# use the tab-trigger below for new function | |
# xt-def-function | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment