-
-
Save thespacedoctor/9c41f8417452719fba248ac3ddd4325c to your computer and use it in GitHub Desktop.
[PanSTARRS DR1 Downloader] Download the StackObjectThin and StackObjectAttributes Tables from MAST #panstarrs #dr1 #casjobs #sherlock
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python | |
# encoding: utf-8 | |
""" | |
*Download the PanSTARRS DR1 Catalogue from MAST via CasJobs into FITS files containing 500,000 rows each* | |
:Author: | |
David Young | |
:Date Created: | |
June 18, 2018 | |
Usage: | |
panstarrs_dr1_downloader | |
Options: | |
-h, --help show this help message | |
""" | |
################# GLOBAL IMPORTS #################### | |
import sys | |
import os | |
from fundamentals import tools | |
import yaml | |
import urllib | |
from subprocess32 import Popen, PIPE, STDOUT, TimeoutExpired | |
import uuid as pyuuid | |
import re | |
import requests | |
import shutil | |
from fundamentals.download import multiobject_download | |
import time | |
from datetime import datetime, date | |
from dateutil.relativedelta import relativedelta | |
import uuid as pyuuid | |
querySize = 2500000 | |
def main(arguments=None): | |
""" | |
*The main function used when ``panstarrs_dr1_downloader.py`` is run as a single script from the cl* | |
""" | |
# SETUP THE COMMAND-LINE UTIL SETTINGS | |
su = tools( | |
arguments=arguments, | |
docString=__doc__, | |
logLevel="INFO", | |
options_first=False, | |
projectName=False | |
) | |
arguments, settings, log, dbConn = su.setup() | |
# UNPACK REMAINING CL ARGUMENTS USING `EXEC` TO SETUP THE VARIABLE NAMES | |
# AUTOMATICALLY | |
for arg, val in arguments.iteritems(): | |
if arg[0] == "-": | |
varname = arg.replace("-", "") + "Flag" | |
else: | |
varname = arg.replace("<", "").replace(">", "") | |
if isinstance(val, str) or isinstance(val, unicode): | |
exec(varname + " = '%s'" % (val,)) | |
else: | |
exec(varname + " = %s" % (val,)) | |
if arg == "--dbConn": | |
dbConn = val | |
log.debug('%s = %s' % (varname, val,)) | |
# GENERATE BOOKKEEPING YAML DATABASE | |
bkfileName = "bookkeeper.yaml" | |
exists = os.path.exists(bkfileName) | |
if not exists: | |
bookkeeper = {} | |
joblist = ["one", "two", "three", "four", | |
"five", "six", "seven", "eight"] | |
bookkeeper["jobs"] = {} | |
for j in joblist: | |
bookkeeper["jobs"][j] = { | |
"max_objid": 0, | |
"thin_job_id": 0, | |
"attr_job_id": None, | |
"status": None | |
} | |
bookkeeper["master_max_objid"] = 0 | |
bookkeeper["rows_downloaded"] = 0 | |
stream = file(bkfileName, 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
# OPEN BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
# LOOP THROUGH OVER QUERIES AND DOWNLOADS UNTIL HIGHEST OBJID IS REACHED | |
while bookkeeper["master_max_objid"] < 215993524933671565: | |
# OPEN BOOKKEEPER | |
log.info('RESTARTING THE LOOP' % locals()) | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
log.info('1. starting new queries' % locals()) | |
start_new_queries(log) | |
log.info('2. extracting query results from MyDB' % locals()) | |
extract_results(log) | |
log.info('3. dropping the MyDB tables' % locals()) | |
drop_tables(log) | |
log.info('4. downloading the output fits files' % locals()) | |
download_results(log) | |
log.info('5. restarting failed jobs' % locals()) | |
restart_failed_jobs(log) | |
log.info('6. killing timed out jobs' % locals()) | |
timeout_long_running_jobs(log) | |
# INCREMENTAL BOOKKEEPER BACKUP | |
now = datetime.now() | |
now = now.strftime("%Y%m%dt%H") | |
shutil.copyfile('bookkeeper.yaml', | |
'bookkeeper_%(now)s.yaml' % locals()) | |
return | |
def current_jobs( | |
log): | |
"""*check casjobs for any currently running jobs and return job IDs* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- ``jobs`` -- list of currently running job IDs | |
""" | |
log.debug('starting the ``current_jobs`` function') | |
cmd = """java -jar /usr/local/bin/casjobs.jar jobs -status running""" % locals() | |
p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
log.debug('output: %(stdout)s' % locals()) | |
if "no jobs found" not in stdout.lower(): | |
jobs = [] | |
regex = re.compile(r'JobID: (\d+)') | |
regexIterator = regex.finditer(stdout) | |
for match in regexIterator: | |
jobs.append(int(match.group(1))) | |
else: | |
jobs = [] | |
log.debug('completed the ``current_jobs`` function') | |
return jobs | |
def start_new_queries( | |
log): | |
"""*Find empty jobs and start new queries* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- None | |
""" | |
log.debug('starting the ``start_new_queries`` function') | |
global querySize | |
# READ BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
# NUMBER OF ROWS PER FILE | |
numberRows = querySize | |
# ITERATE OVER 8 JOBS AND TRIGGER NEW QUERIES IF THERE IS SPACE AVAILABLE | |
# (i.e STATUS IS COMPLETE) | |
for k, v in bookkeeper["jobs"].iteritems(): | |
if not v["status"] or v["status"] == "complete": | |
# USE THE OLD MAX OBJID FOR QUERY LOWER LIMIT - COLLECT NEW MAX ID | |
# 500,000 ROWS ON | |
oldMaxId = int(bookkeeper["master_max_objid"]) | |
newMaxId = None | |
while newMaxId == None: | |
newMaxId = get_next_maxid(log) | |
# NEW STACK ATTR QUERY TO COLLECT 500,000 ROWS | |
uuid = "a" + newMaxId | |
query = """SELECT top %(numberRows)s into myDB.%(uuid)s objID, uniquePspsSTid, gpsfLikelihood, gmomentR1, gmomentRH, gKronRad, gExtNSigma, rpsfLikelihood, rmomentR1, rmomentRH, rKronRad, rExtNSigma, ipsfLikelihood, imomentR1, imomentRH, iKronRad, iExtNSigma, zpsfLikelihood, zmomentR1, zmomentRH, zKronRad, zExtNSigma, ypsfLikelihood, ymomentR1, ymomentRH, yKronRad, yExtNSigma from StackObjectAttributes WHERE primaryDetection = 1 and objid > %(oldMaxId)s order by objid""" % locals() | |
jobId = submit_query(log=log, query=query, uuid=uuid) | |
bookkeeper["jobs"][k]["attr_job_id"] = int(jobId) | |
time.sleep(1) | |
# NEW STACK THIN QUERY TO COLLECT 500,000 ROWS | |
uuid = "t" + newMaxId | |
query = """SELECT top %(numberRows)s into myDB.%(uuid)s objID, uniquePspsSTid, primaryDetection, bestDetection, gra, gdec, gPSFMag, gPSFMagErr, gApMag, gApMagErr, gKronMag, gKronMagErr, ginfoFlag, ginfoFlag2, ginfoFlag3, rra, rdec, rPSFMag, rPSFMagErr, rApMag, rApMagErr, rKronMag, rKronMagErr, rinfoFlag, rinfoFlag2, rinfoFlag3, ira, idec, iPSFMag, iPSFMagErr, iApMag, iApMagErr, iKronMag, iKronMagErr, iinfoFlag, iinfoFlag2, iinfoFlag3, zra, zdec, zPSFMag, zPSFMagErr, zApMag, zApMagErr, zKronMag, zKronMagErr, zinfoFlag, zinfoFlag2, zinfoFlag3, yra, ydec, yPSFMag, yPSFMagErr, yApMag, yApMagErr, yKronMag, yKronMagErr, yinfoFlag, yinfoFlag2, yinfoFlag3 from StackObjectThin WHERE primaryDetection = 1 and objid > %(oldMaxId)s order by objid""" % locals() | |
jobId = submit_query(log=log, query=query, uuid=uuid) | |
bookkeeper["jobs"][k]["thin_job_id"] = int(jobId) | |
bookkeeper["jobs"][k]["status"] = "running" | |
bookkeeper["jobs"][k]["max_objid"] = int(newMaxId) | |
bookkeeper["jobs"][k]["min_objid"] = oldMaxId | |
bookkeeper["master_max_objid"] = int(newMaxId) | |
# UPDATE BOOKKEEPING FILE WITH THE QUERY JOB IDS | |
stream = file('bookkeeper.yaml', 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
log.debug('completed the ``start_new_queries`` function') | |
return None | |
def extract_results( | |
log): | |
"""*check on running jobs and request to extract the results for those jobs that have finished* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- None | |
""" | |
log.debug('starting the ``extract_results`` function') | |
# READ BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
# ITERATE THORUGH JOBS - IF RUNNING CHECK IF JOBS IS COMPLETE. | |
for k, v in bookkeeper["jobs"].iteritems(): | |
if v["status"] == "running": | |
for thisId in [v["attr_job_id"], v["thin_job_id"]]: | |
extract = True | |
status = query_job_status(log=log, jobId=thisId) | |
if status == "failed" or status == None: | |
extract = False | |
bookkeeper["jobs"][k]["status"] = "failed" | |
if status != "complete": | |
extract = False | |
break | |
# IF BOTH THIN AND ATTR QUERIES COMPLETE THEN TRIGGER EXTRACT TO | |
# FITS | |
if extract: | |
# BOTH JOBS COMPLETE - EXTRACT | |
maxId = int(v["max_objid"]) | |
query = "java -jar /usr/local/bin/casjobs.jar extract -a FITS -b a%(maxId)s" % locals( | |
) | |
os.system(query) | |
query = "java -jar /usr/local/bin/casjobs.jar extract -a FITS -b t%(maxId)s" % locals( | |
) | |
os.system(query) | |
bookkeeper["jobs"][k]["status"] = "extracting" | |
# UPDATE BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
log.debug('completed the ``extract_results`` function') | |
return None | |
def download_results( | |
log): | |
"""*download the FITS files for tables that have been extracted and dropped* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- None | |
""" | |
log.debug('starting the ``download_results`` function') | |
global querySize | |
# READ BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
downloads = [] | |
filenames = [] | |
jobNumbers = [] | |
found = None | |
for k, v in bookkeeper["jobs"].iteritems(): | |
if v["status"] == "dropping" and not found: | |
maxId = v["max_objid"] | |
urls = ["a", "t"] | |
downloaded = True | |
for u in urls: | |
if u == 't': | |
thisId = v["thin_job_id"] | |
else: | |
thisId = v["attr_job_id"] | |
status = query_job_status(log=log, jobId=thisId) | |
if status != "complete": | |
downloaded = False | |
break | |
# IF TABLE IS DROPPED EXPORT RESULTS | |
local_filename = "%(u)s%(maxId)0.18d.fits" % locals() | |
url = "http://ps1images.stsci.edu/datadelivery/outgoing/casjobs/fits/%(u)s%(maxId)s_daveyoung.fit" % locals( | |
) | |
downloads.append(url) | |
filenames.append(local_filename) | |
jobNumbers.append(k) | |
found = True | |
if len(filenames): | |
this = (" and ").join(filenames) | |
print "Downloading %(this)s" % locals() | |
localUrls = multiobject_download( | |
urlList=downloads, | |
downloadDirectory="./", | |
log=log, | |
timeStamp=False, | |
timeout=500, | |
concurrentDownloads=2, | |
resetFilename=filenames, | |
credentials=False, | |
longTime=False, | |
indexFilenames=False | |
) | |
downloaded = [] | |
for l, j in zip(localUrls, jobNumbers): | |
if l and j in downloaded: | |
bookkeeper["jobs"][j]["status"] = "complete" | |
bookkeeper["rows_downloaded"] = int( | |
bookkeeper["rows_downloaded"]) + querySize | |
elif l: | |
downloaded.append(j) | |
elif not l and j in downloaded: | |
bookkeeper["jobs"][j]["status"] = "onefailed" | |
for j in jobNumbers: | |
if j not in downloaded: | |
bookkeeper["jobs"][j]["status"] = "failed" | |
# UPDATE BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
log.debug('completed the ``download_results`` function') | |
return None | |
def drop_tables( | |
log): | |
"""*if tables are being extracted try and drop the database tables* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- None | |
""" | |
log.debug('starting the ``drop_tables`` function') | |
# READ BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
# TEST IF PREVIOUS DROPS WORKED - IF NOT TRY AGAIN | |
for k, v in bookkeeper["jobs"].iteritems(): | |
if v["status"] == "dropping": | |
maxId = v["max_objid"] | |
urls = ["a", "t"] | |
for u in urls: | |
if u == 't': | |
thisId = v["thin_job_id"] | |
else: | |
thisId = v["attr_job_id"] | |
status = query_job_status(log=log, jobId=thisId) | |
if status == None: | |
bookkeeper["jobs"][k]["status"] = "failed" | |
elif status == "failed": | |
# TEST TABLE EXISTS | |
query = """java -jar /usr/local/bin/casjobs.jar tables """ % locals( | |
) | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
if "%(u)s%(maxId)s" % locals() not in stdout: | |
bookkeeper["jobs"][k]["status"] = "failed" | |
else: | |
# SHOULD NOT BE ABLE TO DROP TABLE IF EXPORT IS | |
# ONGOING | |
query = """drop table %(u)s%(maxId)s""" % locals( | |
) | |
jobId = submit_query( | |
log=log, query=query, uuid=pyuuid.uuid1(), scheme="MyDB") | |
if u == "a": | |
bookkeeper["jobs"][k][ | |
"attr_job_id"] = int(jobId) | |
else: | |
bookkeeper["jobs"][k][ | |
"thin_job_id"] = int(jobId) | |
# UPDATE BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
# FOR TABLES THAT ARE BEING EXTRACTED TRY AND DUMP THE TABLE - WILL FAIL | |
# IF STILL EXTRACTING | |
for k, v in bookkeeper["jobs"].iteritems(): | |
if v["status"] == "extracting": | |
maxId = v["max_objid"] | |
prefix = ["a", "t"] | |
for u in prefix: | |
# SHOULD NOT BE ABLE TO DROP TABLE IF EXPORT IS ONGOING | |
query = """drop table %(u)s%(maxId)s""" % locals( | |
) | |
jobId = submit_query(log=log, query=query, | |
uuid=pyuuid.uuid1(), scheme="MyDB") | |
if u == "a": | |
bookkeeper["jobs"][k]["attr_job_id"] = int(jobId) | |
else: | |
bookkeeper["jobs"][k]["thin_job_id"] = int(jobId) | |
bookkeeper["jobs"][k]["status"] = "dropping" | |
# UPDATE BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
log.debug('completed the ``drop_tables`` function') | |
return None | |
def restart_failed_jobs( | |
log): | |
"""*try and restart failed queries* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- None | |
""" | |
log.debug('starting the ``restart_failed_jobs`` function') | |
global querySize | |
# NUMBER OF ROWS PER FILE | |
numberRows = querySize | |
# READ BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
# RESTART FAILED JOBS | |
for k, v in bookkeeper["jobs"].iteritems(): | |
if v["status"] == "failed": | |
maxId = v["max_objid"] | |
minId = int(v["min_objid"]) | |
urls = ["a", "t"] | |
# MAKE SURE BOTH JOBS ARE COMPLETED/FAILED | |
completed = True | |
for u in urls: | |
if u == 't': | |
thisId = v["thin_job_id"] | |
else: | |
thisId = v["attr_job_id"] | |
status = query_job_status(log, thisId) | |
if status and status not in ["failed", "complete"]: | |
completed = False | |
print status, status, status | |
if not completed: | |
continue | |
tableExists = True | |
while tableExists: | |
tableExists = False | |
for u in urls: | |
if u == 't': | |
thisId = v["thin_job_id"] | |
else: | |
thisId = v["attr_job_id"] | |
# TEST IF TABLE EXISTS | |
query = """java -jar /usr/local/bin/casjobs.jar tables """ % locals( | |
) | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
thisTable = "%(u)s%(maxId)s" % locals() | |
if thisTable in stdout: | |
# TRY AND DROP TABLE | |
tableExists = True | |
exists = True | |
query = """java -jar /usr/local/bin/casjobs.jar execute -t "MyDB" "drop table %(thisTable)s" """ % locals( | |
) | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
time.sleep(1) | |
# NEW STACK ATTR QUERY TO COLLECT 500,000 ROWS | |
uuid = "a" + str(maxId) | |
query = """SELECT top %(numberRows)s into myDB.%(uuid)s objID, uniquePspsSTid, gpsfLikelihood, gmomentR1, gmomentRH, gKronRad, gExtNSigma, rpsfLikelihood, rmomentR1, rmomentRH, rKronRad, rExtNSigma, ipsfLikelihood, imomentR1, imomentRH, iKronRad, iExtNSigma, zpsfLikelihood, zmomentR1, zmomentRH, zKronRad, zExtNSigma, ypsfLikelihood, ymomentR1, ymomentRH, yKronRad, yExtNSigma from StackObjectAttributes WHERE primaryDetection = 1 and objid > %(minId)s order by objid """ % locals() | |
jobId = submit_query(log=log, query=query, uuid=uuid) | |
bookkeeper["jobs"][k]["attr_job_id"] = int(jobId) | |
time.sleep(1) | |
# NEW STACK THIN QUERY TO COLLECT 500,000 ROWS | |
uuid = "t" + str(maxId) | |
query = """SELECT top %(numberRows)s into myDB.%(uuid)s objID, uniquePspsSTid, primaryDetection, bestDetection, gra, gdec, gPSFMag, gPSFMagErr, gApMag, gApMagErr, gKronMag, gKronMagErr, ginfoFlag, ginfoFlag2, ginfoFlag3, rra, rdec, rPSFMag, rPSFMagErr, rApMag, rApMagErr, rKronMag, rKronMagErr, rinfoFlag, rinfoFlag2, rinfoFlag3, ira, idec, iPSFMag, iPSFMagErr, iApMag, iApMagErr, iKronMag, iKronMagErr, iinfoFlag, iinfoFlag2, iinfoFlag3, zra, zdec, zPSFMag, zPSFMagErr, zApMag, zApMagErr, zKronMag, zKronMagErr, zinfoFlag, zinfoFlag2, zinfoFlag3, yra, ydec, yPSFMag, yPSFMagErr, yApMag, yApMagErr, yKronMag, yKronMagErr, yinfoFlag, yinfoFlag2, yinfoFlag3 from StackObjectThin WHERE primaryDetection = 1 and objid > %(minId)s order by objid""" % locals() | |
jobId = submit_query(log=log, query=query, uuid=uuid) | |
bookkeeper["jobs"][k]["thin_job_id"] = int(jobId) | |
bookkeeper["jobs"][k]["status"] = "running" | |
# UPDATE BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
log.debug('completed the ``restart_failed_jobs`` function') | |
return None | |
# use the tab-trigger below for new function | |
def timeout_long_running_jobs( | |
log): | |
"""*check for jobs that have been running/stuck for a long peroid of time and cancel them to unclog the queue* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- None | |
""" | |
log.debug('starting the ``timeout_long_running_jobs`` function') | |
# READ BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
# GRAB ALL RUNNING JOBS | |
query = """java -jar /usr/local/bin/casjobs.jar j -t started """ % locals( | |
) | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
regex = re.compile( | |
r'Jobid: (?P<jobid>\d+).*?\nStatus:\s+(?P<status>.*?)\s+submitted: (?P<datetime>.*?)\s+Completed', re.S | re.I) | |
now = datetime.utcnow() | |
thisIter = regex.finditer(stdout) | |
for item in thisIter: | |
thisId = item.group("jobid") | |
try: | |
jobStart = datetime.strptime( | |
item.group("datetime"), '%m/%d/%Y %H:%M') | |
jobStart = jobStart + relativedelta(months=1) | |
except: | |
jobStart = datetime.strptime(item.group( | |
"datetime").replace("0/", "1/", 1), '%m/%d/%Y %H:%M') | |
if now.hour > jobStart.hour: | |
jobStart = jobStart + relativedelta(hours=12) | |
jobStart = jobStart - relativedelta(hours=1) | |
diff = now - jobStart | |
diffMin = diff.days / (24 * 60) + diff.seconds / 60 | |
if diffMin > 180: | |
for k, v in bookkeeper["jobs"].iteritems(): | |
if bookkeeper["jobs"][k]["attr_job_id"] == thisId: | |
bookkeeper["jobs"][k]["status"] = "failed" | |
if bookkeeper["jobs"][k]["thin_job_id"] == thisId: | |
bookkeeper["jobs"][k]["status"] = "failed" | |
# print "open | |
# http://mastweb.stsci.edu/ps1casjobs/cancelJob.aspx?id=%(thisId)s&CancelJob=Cancel+Job" | |
# % locals() | |
# UPDATE BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'w') | |
yaml.dump(bookkeeper, stream, default_flow_style=False) | |
stream.close() | |
time.sleep(1) | |
log.debug('completed the ``timeout_long_running_jobs`` function') | |
return None | |
def submit_query( | |
log, | |
query, | |
uuid, | |
scheme="PanSTARRS_DR1"): | |
"""*submit a query and return the job id* | |
**Key Arguments:** | |
- ``log`` -- logger | |
- ``query`` -- the MAST casjob query to submit | |
- ``uuid`` -- the UUID with which to name the query and resulting database table | |
- ``scheme`` -- the scheme to query | |
**Return:** | |
- ``jobID`` -- the job id from the submitted query | |
""" | |
log.debug('starting the ``submit_query`` function') | |
query = """java -jar /usr/local/bin/casjobs.jar submit -t %(scheme)s -n "%(uuid)s" "%(query)s" """ % locals( | |
) | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
log.debug('output: %(stdout)s' % locals()) | |
matchObject = re.search(r"JobID is (\d+)", stdout) | |
if matchObject: | |
jobId = matchObject.group(1) | |
else: | |
jobId = None | |
log.debug('completed the ``submit_query`` function') | |
return jobId | |
# use the tab-trigger below for new function | |
def get_next_maxid( | |
log): | |
"""*get the next maximum ID for the next batch of rows to request from MAST* | |
**Key Arguments:** | |
- ``log`` -- logger | |
**Return:** | |
- ``maxid`` -- the maximum ID of the next batch of rows to request | |
""" | |
log.debug('starting the ``get_next_maxid`` function') | |
global querySize | |
# READ BOOKKEEPER | |
stream = file('bookkeeper.yaml', 'r') | |
bookkeeper = yaml.load(stream) | |
stream.close() | |
# NUMBER OF ROWS PER FILE | |
numberRows = querySize | |
oldMaxId = int(bookkeeper["master_max_objid"]) | |
offset = numberRows - 1 | |
query = """SELECT objID as maxId into myDB.maxid from StackObjectAttributes WHERE primaryDetection = 1 and objid > %(oldMaxId)s order by objid offset %(offset)s rows fetch next 1 rows only""" % locals( | |
) | |
jobId = submit_query(log=log, query=query, uuid="maxid") | |
waiting = True | |
while waiting: | |
time.sleep(10) | |
status = query_job_status(log, jobId) | |
if status.lower() == "complete": | |
waiting = False | |
query = """java -jar /usr/local/bin/casjobs.jar execute -t "MyDB" "select maxId from maxid" """ | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
log.debug('output: %(stdout)s' % locals()) | |
matchObject = re.search(r"\d{5,30}", stdout) | |
if matchObject: | |
maxid = matchObject.group() | |
else: | |
maxid = None | |
elif status.lower() == "failed": | |
log.error('cound not get maxid') | |
waiting = False | |
maxid = None | |
# TEST TABLE EXISTS | |
query = """java -jar /usr/local/bin/casjobs.jar tables """ % locals( | |
) | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
if "maxid" in stdout.lower(): | |
query = """drop table maxid""" % locals( | |
) | |
jobId = submit_query( | |
log=log, query=query, uuid=pyuuid.uuid1(), scheme="MyDB") | |
waiting = True | |
while waiting: | |
time.sleep(10) | |
status = query_job_status(log, jobId) | |
if status.lower() == "complete": | |
waiting = False | |
log.debug('completed the ``get_next_maxid`` function') | |
return maxid | |
def query_job_status( | |
log, | |
jobId): | |
"""*query the status of a job* | |
**Key Arguments:** | |
- ``log`` -- logger | |
- ``jobId`` -- the casjob job id to request a status for | |
**Return:** | |
- ``status`` -- the status of the job | |
""" | |
log.debug('starting the ``query_job_status`` function') | |
status = None | |
attempt = 0 | |
while status == None and attempt != 3: | |
query = """ java -jar /usr/local/bin/casjobs.jar jobs -j %(jobId)s """ % locals( | |
) | |
log.debug("""query: `%(query)s`""" % locals()) | |
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True) | |
stdout, stderr = p.communicate() | |
log.debug('output: %(stdout)s' % locals()) | |
matchObject = re.search(r"Status: (\S+)", stdout) | |
if matchObject: | |
status = matchObject.group(1) | |
if status == "failed" and "There is already an object" in stdout: | |
status = "complete" | |
else: | |
log.error('cound not find status of jobid %(jobId)s' % locals()) | |
attempt += 1 | |
time.sleep(1) | |
log.debug('completed the ``query_job_status`` function') | |
return status | |
# use the tab-trigger below for new function | |
# xt-def-function | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment