Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@thespacedoctor
Last active May 5, 2021 15:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thespacedoctor/9c41f8417452719fba248ac3ddd4325c to your computer and use it in GitHub Desktop.
Save thespacedoctor/9c41f8417452719fba248ac3ddd4325c to your computer and use it in GitHub Desktop.
[PanSTARRS DR1 Downloader] Download the StackObjectThin and StackObjectAttributes Tables from MAST #panstarrs #dr1 #casjobs #sherlock
#!/usr/local/bin/python
# encoding: utf-8
"""
*Download the PanSTARRS DR1 Catalogue from MAST via CasJobs into FITS files containing 500,000 rows each*
:Author:
David Young
:Date Created:
June 18, 2018
Usage:
panstarrs_dr1_downloader
Options:
-h, --help show this help message
"""
################# GLOBAL IMPORTS ####################
import sys
import os
from fundamentals import tools
import yaml
import urllib
from subprocess32 import Popen, PIPE, STDOUT, TimeoutExpired
import uuid as pyuuid
import re
import requests
import shutil
from fundamentals.download import multiobject_download
import time
from datetime import datetime, date
from dateutil.relativedelta import relativedelta
import uuid as pyuuid
querySize = 2500000
def main(arguments=None):
"""
*The main function used when ``panstarrs_dr1_downloader.py`` is run as a single script from the cl*
"""
# SETUP THE COMMAND-LINE UTIL SETTINGS
su = tools(
arguments=arguments,
docString=__doc__,
logLevel="INFO",
options_first=False,
projectName=False
)
arguments, settings, log, dbConn = su.setup()
# UNPACK REMAINING CL ARGUMENTS USING `EXEC` TO SETUP THE VARIABLE NAMES
# AUTOMATICALLY
for arg, val in arguments.iteritems():
if arg[0] == "-":
varname = arg.replace("-", "") + "Flag"
else:
varname = arg.replace("<", "").replace(">", "")
if isinstance(val, str) or isinstance(val, unicode):
exec(varname + " = '%s'" % (val,))
else:
exec(varname + " = %s" % (val,))
if arg == "--dbConn":
dbConn = val
log.debug('%s = %s' % (varname, val,))
# GENERATE BOOKKEEPING YAML DATABASE
bkfileName = "bookkeeper.yaml"
exists = os.path.exists(bkfileName)
if not exists:
bookkeeper = {}
joblist = ["one", "two", "three", "four",
"five", "six", "seven", "eight"]
bookkeeper["jobs"] = {}
for j in joblist:
bookkeeper["jobs"][j] = {
"max_objid": 0,
"thin_job_id": 0,
"attr_job_id": None,
"status": None
}
bookkeeper["master_max_objid"] = 0
bookkeeper["rows_downloaded"] = 0
stream = file(bkfileName, 'w')
yaml.dump(bookkeeper, stream, default_flow_style=False)
stream.close()
time.sleep(1)
# OPEN BOOKKEEPER
stream = file('bookkeeper.yaml', 'r')
bookkeeper = yaml.load(stream)
# LOOP THROUGH OVER QUERIES AND DOWNLOADS UNTIL HIGHEST OBJID IS REACHED
while bookkeeper["master_max_objid"] < 215993524933671565:
# OPEN BOOKKEEPER
log.info('RESTARTING THE LOOP' % locals())
stream = file('bookkeeper.yaml', 'r')
bookkeeper = yaml.load(stream)
stream.close()
log.info('1. starting new queries' % locals())
start_new_queries(log)
log.info('2. extracting query results from MyDB' % locals())
extract_results(log)
log.info('3. dropping the MyDB tables' % locals())
drop_tables(log)
log.info('4. downloading the output fits files' % locals())
download_results(log)
log.info('5. restarting failed jobs' % locals())
restart_failed_jobs(log)
log.info('6. killing timed out jobs' % locals())
timeout_long_running_jobs(log)
# INCREMENTAL BOOKKEEPER BACKUP
now = datetime.now()
now = now.strftime("%Y%m%dt%H")
shutil.copyfile('bookkeeper.yaml',
'bookkeeper_%(now)s.yaml' % locals())
return
def current_jobs(
log):
"""*check casjobs for any currently running jobs and return job IDs*
**Key Arguments:**
- ``log`` -- logger
**Return:**
- ``jobs`` -- list of currently running job IDs
"""
log.debug('starting the ``current_jobs`` function')
cmd = """java -jar /usr/local/bin/casjobs.jar jobs -status running""" % locals()
p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
stdout, stderr = p.communicate()
log.debug('output: %(stdout)s' % locals())
if "no jobs found" not in stdout.lower():
jobs = []
regex = re.compile(r'JobID: (\d+)')
regexIterator = regex.finditer(stdout)
for match in regexIterator:
jobs.append(int(match.group(1)))
else:
jobs = []
log.debug('completed the ``current_jobs`` function')
return jobs
def start_new_queries(
log):
"""*Find empty jobs and start new queries*
**Key Arguments:**
- ``log`` -- logger
**Return:**
- None
"""
log.debug('starting the ``start_new_queries`` function')
global querySize
# READ BOOKKEEPER
stream = file('bookkeeper.yaml', 'r')
bookkeeper = yaml.load(stream)
stream.close()
# NUMBER OF ROWS PER FILE
numberRows = querySize
# ITERATE OVER 8 JOBS AND TRIGGER NEW QUERIES IF THERE IS SPACE AVAILABLE
# (i.e STATUS IS COMPLETE)
for k, v in bookkeeper["jobs"].iteritems():
if not v["status"] or v["status"] == "complete":
# USE THE OLD MAX OBJID FOR QUERY LOWER LIMIT - COLLECT NEW MAX ID
# 500,000 ROWS ON
oldMaxId = int(bookkeeper["master_max_objid"])
newMaxId = None
while newMaxId == None:
newMaxId = get_next_maxid(log)
# NEW STACK ATTR QUERY TO COLLECT 500,000 ROWS
uuid = "a" + newMaxId
query = """SELECT top %(numberRows)s into myDB.%(uuid)s objID, uniquePspsSTid, gpsfLikelihood, gmomentR1, gmomentRH, gKronRad, gExtNSigma, rpsfLikelihood, rmomentR1, rmomentRH, rKronRad, rExtNSigma, ipsfLikelihood, imomentR1, imomentRH, iKronRad, iExtNSigma, zpsfLikelihood, zmomentR1, zmomentRH, zKronRad, zExtNSigma, ypsfLikelihood, ymomentR1, ymomentRH, yKronRad, yExtNSigma from StackObjectAttributes WHERE primaryDetection = 1 and objid > %(oldMaxId)s order by objid""" % locals()
jobId = submit_query(log=log, query=query, uuid=uuid)
bookkeeper["jobs"][k]["attr_job_id"] = int(jobId)
time.sleep(1)
# NEW STACK THIN QUERY TO COLLECT 500,000 ROWS
uuid = "t" + newMaxId
query = """SELECT top %(numberRows)s into myDB.%(uuid)s objID, uniquePspsSTid, primaryDetection, bestDetection, gra, gdec, gPSFMag, gPSFMagErr, gApMag, gApMagErr, gKronMag, gKronMagErr, ginfoFlag, ginfoFlag2, ginfoFlag3, rra, rdec, rPSFMag, rPSFMagErr, rApMag, rApMagErr, rKronMag, rKronMagErr, rinfoFlag, rinfoFlag2, rinfoFlag3, ira, idec, iPSFMag, iPSFMagErr, iApMag, iApMagErr, iKronMag, iKronMagErr, iinfoFlag, iinfoFlag2, iinfoFlag3, zra, zdec, zPSFMag, zPSFMagErr, zApMag, zApMagErr, zKronMag, zKronMagErr, zinfoFlag, zinfoFlag2, zinfoFlag3, yra, ydec, yPSFMag, yPSFMagErr, yApMag, yApMagErr, yKronMag, yKronMagErr, yinfoFlag, yinfoFlag2, yinfoFlag3 from StackObjectThin WHERE primaryDetection = 1 and objid > %(oldMaxId)s order by objid""" % locals()
jobId = submit_query(log=log, query=query, uuid=uuid)
bookkeeper["jobs"][k]["thin_job_id"] = int(jobId)
bookkeeper["jobs"][k]["status"] = "running"
bookkeeper["jobs"][k]["max_objid"] = int(newMaxId)
bookkeeper["jobs"][k]["min_objid"] = oldMaxId
bookkeeper["master_max_objid"] = int(newMaxId)
# UPDATE BOOKKEEPING FILE WITH THE QUERY JOB IDS
stream = file('bookkeeper.yaml', 'w')
yaml.dump(bookkeeper, stream, default_flow_style=False)
stream.close()
time.sleep(1)
log.debug('completed the ``start_new_queries`` function')
return None
def extract_results(
log):
"""*check on running jobs and request to extract the results for those jobs that have finished*
**Key Arguments:**
- ``log`` -- logger
**Return:**
- None
"""
log.debug('starting the ``extract_results`` function')
# READ BOOKKEEPER
stream = file('bookkeeper.yaml', 'r')
bookkeeper = yaml.load(stream)
stream.close()
# ITERATE THORUGH JOBS - IF RUNNING CHECK IF JOBS IS COMPLETE.
for k, v in bookkeeper["jobs"].iteritems():
if v["status"] == "running":
for thisId in [v["attr_job_id"], v["thin_job_id"]]:
extract = True
status = query_job_status(log=log, jobId=thisId)
if status == "failed" or status == None:
extract = False
bookkeeper["jobs"][k]["status"] = "failed"
if status != "complete":
extract = False
break
# IF BOTH THIN AND ATTR QUERIES COMPLETE THEN TRIGGER EXTRACT TO
# FITS
if extract:
# BOTH JOBS COMPLETE - EXTRACT
maxId = int(v["max_objid"])
query = "java -jar /usr/local/bin/casjobs.jar extract -a FITS -b a%(maxId)s" % locals(
)
os.system(query)
query = "java -jar /usr/local/bin/casjobs.jar extract -a FITS -b t%(maxId)s" % locals(
)
os.system(query)
bookkeeper["jobs"][k]["status"] = "extracting"
# UPDATE BOOKKEEPER
stream = file('bookkeeper.yaml', 'w')
yaml.dump(bookkeeper, stream, default_flow_style=False)
stream.close()
time.sleep(1)
log.debug('completed the ``extract_results`` function')
return None
def download_results(
log):
"""*download the FITS files for tables that have been extracted and dropped*
**Key Arguments:**
- ``log`` -- logger
**Return:**
- None
"""
log.debug('starting the ``download_results`` function')
global querySize
# READ BOOKKEEPER
stream = file('bookkeeper.yaml', 'r')
bookkeeper = yaml.load(stream)
stream.close()
downloads = []
filenames = []
jobNumbers = []
found = None
for k, v in bookkeeper["jobs"].iteritems():
if v["status"] == "dropping" and not found:
maxId = v["max_objid"]
urls = ["a", "t"]
downloaded = True
for u in urls:
if u == 't':
thisId = v["thin_job_id"]
else:
thisId = v["attr_job_id"]
status = query_job_status(log=log, jobId=thisId)
if status != "complete":
downloaded = False
break
# IF TABLE IS DROPPED EXPORT RESULTS
local_filename = "%(u)s%(maxId)0.18d.fits" % locals()
url = "http://ps1images.stsci.edu/datadelivery/outgoing/casjobs/fits/%(u)s%(maxId)s_daveyoung.fit" % locals(
)
downloads.append(url)
filenames.append(local_filename)
jobNumbers.append(k)
found = True
if len(filenames):
this = (" and ").join(filenames)
print "Downloading %(this)s" % locals()
localUrls = multiobject_download(
urlList=downloads,
downloadDirectory="./",
log=log,
timeStamp=False,
timeout=500,
concurrentDownloads=2,
resetFilename=filenames,
credentials=False,
longTime=False,
indexFilenames=False
)
downloaded = []
for l, j in zip(localUrls, jobNumbers):
if l and j in downloaded:
bookkeeper["jobs"][j]["status"] = "complete"
bookkeeper["rows_downloaded"] = int(
bookkeeper["rows_downloaded"]) + querySize
elif l:
downloaded.append(j)
elif not l and j in downloaded:
bookkeeper["jobs"][j]["status"] = "onefailed"
for j in jobNumbers:
if j not in downloaded:
bookkeeper["jobs"][j]["status"] = "failed"
# UPDATE BOOKKEEPER
stream = file('bookkeeper.yaml', 'w')
yaml.dump(bookkeeper, stream, default_flow_style=False)
stream.close()
time.sleep(1)
log.debug('completed the ``download_results`` function')
return None
def drop_tables(
log):
"""*if tables are being extracted try and drop the database tables*
**Key Arguments:**
- ``log`` -- logger
**Return:**
- None
"""
log.debug('starting the ``drop_tables`` function')
# READ BOOKKEEPER
stream = file('bookkeeper.yaml', 'r')
bookkeeper = yaml.load(stream)
stream.close()
# TEST IF PREVIOUS DROPS WORKED - IF NOT TRY AGAIN
for k, v in bookkeeper["jobs"].iteritems():
if v["status"] == "dropping":
maxId = v["max_objid"]
urls = ["a", "t"]
for u in urls:
if u == 't':
thisId = v["thin_job_id"]
else:
thisId = v["attr_job_id"]
status = query_job_status(log=log, jobId=thisId)
if status == None:
bookkeeper["jobs"][k]["status"] = "failed"
elif status == "failed":
# TEST TABLE EXISTS
query = """java -jar /usr/local/bin/casjobs.jar tables """ % locals(
)
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True)
stdout, stderr = p.communicate()
if "%(u)s%(maxId)s" % locals() not in stdout:
bookkeeper["jobs"][k]["status"] = "failed"
else:
# SHOULD NOT BE ABLE TO DROP TABLE IF EXPORT IS
# ONGOING
query = """drop table %(u)s%(maxId)s""" % locals(
)
jobId = submit_query(
log=log, query=query, uuid=pyuuid.uuid1(), scheme="MyDB")
if u == "a":
bookkeeper["jobs"][k][
"attr_job_id"] = int(jobId)
else:
bookkeeper["jobs"][k][
"thin_job_id"] = int(jobId)
# UPDATE BOOKKEEPER
stream = file('bookkeeper.yaml', 'w')
yaml.dump(bookkeeper, stream, default_flow_style=False)
stream.close()
time.sleep(1)
# FOR TABLES THAT ARE BEING EXTRACTED TRY AND DUMP THE TABLE - WILL FAIL
# IF STILL EXTRACTING
for k, v in bookkeeper["jobs"].iteritems():
if v["status"] == "extracting":
maxId = v["max_objid"]
prefix = ["a", "t"]
for u in prefix:
# SHOULD NOT BE ABLE TO DROP TABLE IF EXPORT IS ONGOING
query = """drop table %(u)s%(maxId)s""" % locals(
)
jobId = submit_query(log=log, query=query,
uuid=pyuuid.uuid1(), scheme="MyDB")
if u == "a":
bookkeeper["jobs"][k]["attr_job_id"] = int(jobId)
else:
bookkeeper["jobs"][k]["thin_job_id"] = int(jobId)
bookkeeper["jobs"][k]["status"] = "dropping"
# UPDATE BOOKKEEPER
stream = file('bookkeeper.yaml', 'w')
yaml.dump(bookkeeper, stream, default_flow_style=False)
stream.close()
time.sleep(1)
log.debug('completed the ``drop_tables`` function')
return None
def restart_failed_jobs(
log):
"""*try and restart failed queries*
**Key Arguments:**
- ``log`` -- logger
**Return:**
- None
"""
log.debug('starting the ``restart_failed_jobs`` function')
global querySize
# NUMBER OF ROWS PER FILE
numberRows = querySize
# READ BOOKKEEPER
stream = file('bookkeeper.yaml', 'r')
bookkeeper = yaml.load(stream)
stream.close()
# RESTART FAILED JOBS
for k, v in bookkeeper["jobs"].iteritems():
if v["status"] == "failed":
maxId = v["max_objid"]
minId = int(v["min_objid"])
urls = ["a", "t"]
# MAKE SURE BOTH JOBS ARE COMPLETED/FAILED
completed = True
for u in urls:
if u == 't':
thisId = v["thin_job_id"]
else:
thisId = v["attr_job_id"]
status = query_job_status(log, thisId)
if status and status not in ["failed", "complete"]:
completed = False
print status, status, status
if not completed:
continue
tableExists = True
while tableExists:
tableExists = False
for u in urls:
if u == 't':
thisId = v["thin_job_id"]
else:
thisId = v["attr_job_id"]
# TEST IF TABLE EXISTS
query = """java -jar /usr/local/bin/casjobs.jar tables """ % locals(
)
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True)
stdout, stderr = p.communicate()
thisTable = "%(u)s%(maxId)s" % locals()
if thisTable in stdout:
# TRY AND DROP TABLE
tableExists = True
exists = True
query = """java -jar /usr/local/bin/casjobs.jar execute -t "MyDB" "drop table %(thisTable)s" """ % locals(
)
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True)
stdout, stderr = p.communicate()
time.sleep(1)
# NEW STACK ATTR QUERY TO COLLECT 500,000 ROWS
uuid = "a" + str(maxId)
query = """SELECT top %(numberRows)s into myDB.%(uuid)s objID, uniquePspsSTid, gpsfLikelihood, gmomentR1, gmomentRH, gKronRad, gExtNSigma, rpsfLikelihood, rmomentR1, rmomentRH, rKronRad, rExtNSigma, ipsfLikelihood, imomentR1, imomentRH, iKronRad, iExtNSigma, zpsfLikelihood, zmomentR1, zmomentRH, zKronRad, zExtNSigma, ypsfLikelihood, ymomentR1, ymomentRH, yKronRad, yExtNSigma from StackObjectAttributes WHERE primaryDetection = 1 and objid > %(minId)s order by objid """ % locals()
jobId = submit_query(log=log, query=query, uuid=uuid)
bookkeeper["jobs"][k]["attr_job_id"] = int(jobId)
time.sleep(1)
# NEW STACK THIN QUERY TO COLLECT 500,000 ROWS
uuid = "t" + str(maxId)
query = """SELECT top %(numberRows)s into myDB.%(uuid)s objID, uniquePspsSTid, primaryDetection, bestDetection, gra, gdec, gPSFMag, gPSFMagErr, gApMag, gApMagErr, gKronMag, gKronMagErr, ginfoFlag, ginfoFlag2, ginfoFlag3, rra, rdec, rPSFMag, rPSFMagErr, rApMag, rApMagErr, rKronMag, rKronMagErr, rinfoFlag, rinfoFlag2, rinfoFlag3, ira, idec, iPSFMag, iPSFMagErr, iApMag, iApMagErr, iKronMag, iKronMagErr, iinfoFlag, iinfoFlag2, iinfoFlag3, zra, zdec, zPSFMag, zPSFMagErr, zApMag, zApMagErr, zKronMag, zKronMagErr, zinfoFlag, zinfoFlag2, zinfoFlag3, yra, ydec, yPSFMag, yPSFMagErr, yApMag, yApMagErr, yKronMag, yKronMagErr, yinfoFlag, yinfoFlag2, yinfoFlag3 from StackObjectThin WHERE primaryDetection = 1 and objid > %(minId)s order by objid""" % locals()
jobId = submit_query(log=log, query=query, uuid=uuid)
bookkeeper["jobs"][k]["thin_job_id"] = int(jobId)
bookkeeper["jobs"][k]["status"] = "running"
# UPDATE BOOKKEEPER
stream = file('bookkeeper.yaml', 'w')
yaml.dump(bookkeeper, stream, default_flow_style=False)
stream.close()
time.sleep(1)
log.debug('completed the ``restart_failed_jobs`` function')
return None
# use the tab-trigger below for new function
def timeout_long_running_jobs(
log):
"""*check for jobs that have been running/stuck for a long peroid of time and cancel them to unclog the queue*
**Key Arguments:**
- ``log`` -- logger
**Return:**
- None
"""
log.debug('starting the ``timeout_long_running_jobs`` function')
# READ BOOKKEEPER
stream = file('bookkeeper.yaml', 'r')
bookkeeper = yaml.load(stream)
stream.close()
# GRAB ALL RUNNING JOBS
query = """java -jar /usr/local/bin/casjobs.jar j -t started """ % locals(
)
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True)
stdout, stderr = p.communicate()
regex = re.compile(
r'Jobid: (?P<jobid>\d+).*?\nStatus:\s+(?P<status>.*?)\s+submitted: (?P<datetime>.*?)\s+Completed', re.S | re.I)
now = datetime.utcnow()
thisIter = regex.finditer(stdout)
for item in thisIter:
thisId = item.group("jobid")
try:
jobStart = datetime.strptime(
item.group("datetime"), '%m/%d/%Y %H:%M')
jobStart = jobStart + relativedelta(months=1)
except:
jobStart = datetime.strptime(item.group(
"datetime").replace("0/", "1/", 1), '%m/%d/%Y %H:%M')
if now.hour > jobStart.hour:
jobStart = jobStart + relativedelta(hours=12)
jobStart = jobStart - relativedelta(hours=1)
diff = now - jobStart
diffMin = diff.days / (24 * 60) + diff.seconds / 60
if diffMin > 180:
for k, v in bookkeeper["jobs"].iteritems():
if bookkeeper["jobs"][k]["attr_job_id"] == thisId:
bookkeeper["jobs"][k]["status"] = "failed"
if bookkeeper["jobs"][k]["thin_job_id"] == thisId:
bookkeeper["jobs"][k]["status"] = "failed"
# print "open
# http://mastweb.stsci.edu/ps1casjobs/cancelJob.aspx?id=%(thisId)s&CancelJob=Cancel+Job"
# % locals()
# UPDATE BOOKKEEPER
stream = file('bookkeeper.yaml', 'w')
yaml.dump(bookkeeper, stream, default_flow_style=False)
stream.close()
time.sleep(1)
log.debug('completed the ``timeout_long_running_jobs`` function')
return None
def submit_query(
log,
query,
uuid,
scheme="PanSTARRS_DR1"):
"""*submit a query and return the job id*
**Key Arguments:**
- ``log`` -- logger
- ``query`` -- the MAST casjob query to submit
- ``uuid`` -- the UUID with which to name the query and resulting database table
- ``scheme`` -- the scheme to query
**Return:**
- ``jobID`` -- the job id from the submitted query
"""
log.debug('starting the ``submit_query`` function')
query = """java -jar /usr/local/bin/casjobs.jar submit -t %(scheme)s -n "%(uuid)s" "%(query)s" """ % locals(
)
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True)
stdout, stderr = p.communicate()
log.debug('output: %(stdout)s' % locals())
matchObject = re.search(r"JobID is (\d+)", stdout)
if matchObject:
jobId = matchObject.group(1)
else:
jobId = None
log.debug('completed the ``submit_query`` function')
return jobId
# use the tab-trigger below for new function
def get_next_maxid(
log):
"""*get the next maximum ID for the next batch of rows to request from MAST*
**Key Arguments:**
- ``log`` -- logger
**Return:**
- ``maxid`` -- the maximum ID of the next batch of rows to request
"""
log.debug('starting the ``get_next_maxid`` function')
global querySize
# READ BOOKKEEPER
stream = file('bookkeeper.yaml', 'r')
bookkeeper = yaml.load(stream)
stream.close()
# NUMBER OF ROWS PER FILE
numberRows = querySize
oldMaxId = int(bookkeeper["master_max_objid"])
offset = numberRows - 1
query = """SELECT objID as maxId into myDB.maxid from StackObjectAttributes WHERE primaryDetection = 1 and objid > %(oldMaxId)s order by objid offset %(offset)s rows fetch next 1 rows only""" % locals(
)
jobId = submit_query(log=log, query=query, uuid="maxid")
waiting = True
while waiting:
time.sleep(10)
status = query_job_status(log, jobId)
if status.lower() == "complete":
waiting = False
query = """java -jar /usr/local/bin/casjobs.jar execute -t "MyDB" "select maxId from maxid" """
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True)
stdout, stderr = p.communicate()
log.debug('output: %(stdout)s' % locals())
matchObject = re.search(r"\d{5,30}", stdout)
if matchObject:
maxid = matchObject.group()
else:
maxid = None
elif status.lower() == "failed":
log.error('cound not get maxid')
waiting = False
maxid = None
# TEST TABLE EXISTS
query = """java -jar /usr/local/bin/casjobs.jar tables """ % locals(
)
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True)
stdout, stderr = p.communicate()
if "maxid" in stdout.lower():
query = """drop table maxid""" % locals(
)
jobId = submit_query(
log=log, query=query, uuid=pyuuid.uuid1(), scheme="MyDB")
waiting = True
while waiting:
time.sleep(10)
status = query_job_status(log, jobId)
if status.lower() == "complete":
waiting = False
log.debug('completed the ``get_next_maxid`` function')
return maxid
def query_job_status(
log,
jobId):
"""*query the status of a job*
**Key Arguments:**
- ``log`` -- logger
- ``jobId`` -- the casjob job id to request a status for
**Return:**
- ``status`` -- the status of the job
"""
log.debug('starting the ``query_job_status`` function')
status = None
attempt = 0
while status == None and attempt != 3:
query = """ java -jar /usr/local/bin/casjobs.jar jobs -j %(jobId)s """ % locals(
)
log.debug("""query: `%(query)s`""" % locals())
p = Popen(query, stdout=PIPE, stderr=PIPE, shell=True)
stdout, stderr = p.communicate()
log.debug('output: %(stdout)s' % locals())
matchObject = re.search(r"Status: (\S+)", stdout)
if matchObject:
status = matchObject.group(1)
if status == "failed" and "There is already an object" in stdout:
status = "complete"
else:
log.error('cound not find status of jobid %(jobId)s' % locals())
attempt += 1
time.sleep(1)
log.debug('completed the ``query_job_status`` function')
return status
# use the tab-trigger below for new function
# xt-def-function
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment