Skip to content

Instantly share code, notes, and snippets.

@bmorris3
Last active June 3, 2019 01:05
Show Gist options
  • Save bmorris3/b67d0a2e9c8c3e7db014e4bce2615076 to your computer and use it in GitHub Desktop.
Save bmorris3/b67d0a2e9c8c3e7db014e4bce2615076 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""skycalc Module"""
from __future__ import print_function
import sys
import json
from datetime import datetime
import requests
class AlmanacQuery:
"""AlmanacQuery for Querying the SkyCalc Almanac"""
def __init__(self, indic):
self.almdata = None
self.almserver = 'http://etimecalret-001.eso.org'
self.almurl = '/observing/etc/api/skycalc_almanac'
# Left: users keyword (skycalc_cli),
# Right: skycalc Almanac output keywords
self.alm_parameters = {}
self.alm_parameters['airmass'] = 'target_airmass'
self.alm_parameters['msolflux'] = 'sun_aveflux'
self.alm_parameters['moon_sun_sep'] = 'moon_sun_sep'
self.alm_parameters['moon_target_sep'] = 'moon_target_sep'
self.alm_parameters['moon_alt'] = 'moon_alt'
self.alm_parameters['moon_earth_dist'] = 'moon_earth_dist'
self.alm_parameters['ecl_lon'] = 'ecl_lon'
self.alm_parameters['ecl_lat'] = 'ecl_lat'
self.alm_parameters['observatory'] = 'observatory'
self.almindic = {}
# The Almanac needs:
# coord_ra : float [deg]
# coord_dec : float [deg]
# input_type : ut_time | local_civil_time | mjd
# mjd : float
# coord_year : int
# coord_month : int
# coord_day : int
# coord_ut_hour : int
# coord_ut_min : int
# coord_ut_sec : float
if 'date' in indic:
self.almindic['input_type'] = 'ut_time'
isotime = None
try:
isotime = datetime.strptime(indic['date'],
'%Y-%m-%dT%H:%M:%S')
except ValueError:
print('Error: wrong date format for the Almanac.')
raise
self.almindic['coord_year'] = isotime.year
self.almindic['coord_month'] = isotime.month
self.almindic['coord_day'] = isotime.day
self.almindic['coord_ut_hour'] = isotime.hour
self.almindic['coord_ut_min'] = isotime.minute
self.almindic['coord_ut_sec'] = isotime.second
elif 'mjd' in indic:
self.almindic['input_type'] = 'mjd'
mjd = None
try:
mjd = float(indic['mjd'])
except ValueError:
print('Error: wrong mjd format for the Almanac.')
raise
self.almindic['mjd'] = mjd
else:
raise ValueError('Error: no date or mjd given for the Almanac')
if 'ra' not in indic:
raise ValueError('Error: ra coordinate not given for the Almanac.')
if 'dec' not in indic:
raise ValueError('Error: dec coordinate '
'not given for the Almanac.')
ra = None
try:
ra = float(indic['ra'])
except ValueError:
print('Error: wrong ra format for the Almanac.')
raise
self.almindic['coord_ra'] = ra
dec = None
try:
dec = float(indic['dec'])
except ValueError:
print('Error: wrong dec format for the Almanac.')
raise
self.almindic['coord_dec'] = dec
if 'observatory' in indic:
self.almindic['observatory'] = indic['observatory']
def query(self):
url = self.almserver + self.almurl
rawdata = None
try:
response = requests.post(url, json.dumps(self.almindic))
rawdata = response.text
except requests.exceptions.RequestException as e:
print('Error: Almanac query failed.')
raise
# Process rawdata
jsondata = None
try:
jsondata = json.loads(rawdata)
jsondata = jsondata['output']
except (KeyError, ValueError):
print('Error: invalid Almanac response.')
raise
# Find the relevant (key, value)
almdata = {}
for key, value in self.alm_parameters.items():
subsection = 'nothing'
prefix = value.split('_')[0]
if prefix == 'sun' or prefix == 'moon' or prefix == 'target':
subsection = prefix
elif prefix == 'ecl':
subsection = 'target'
else:
subsection = 'observation'
try:
almdata[key] = jsondata[subsection][value]
except (KeyError, ValueError):
print('Warning: key "' + subsection + '/' + value +
'" not found in the Almanac response.')
return almdata
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class SkyModel:
"""SkyModel for querying the Advanced SkyModel"""
def __init__(self):
self.stop_on_errors_and_exceptions = True
self.data = None
self.server = 'http://etimecalret-001.eso.org'
self.url = self.server + '/observing/etc/api/skycalc'
self.deleter_script_url = self.server + '/observing/etc/api/rmtmp'
self.bugreport_text = ''
self.tmpdir = ''
self.params = {
# Airmass. Alt and airmass are coupled through the plane parallel
# approximation airmass=sec(z), z being the zenith distance
# z=90°−Alt
'airmass': 1.0, # float range [1.0,3.0]
# Season and Period of Night
'pwv_mode': 'pwv', # string grid ['pwv','season']
# integer grid [0,1,2,3,4,5,6] (0=all year, 1=dec/jan,2=feb/mar...)
'season': 0,
# third of night integer grid [0,1,2,3] (0=all year, 1,2,3 = third
# of night)
'time': 0,
# Precipitable Water Vapor PWV
# mm float grid [-1.0,0.5,1.0,1.5,2.5,3.5,5.0,7.5,10.0,20.0]
'pwv': 3.5,
# Monthly Averaged Solar Flux
'msolflux': 130.0, # s.f.u float > 0
# Scattered Moon Light
# Moon coordinate constraints: |z – zmoon| ≤ ρ ≤ |z + zmoon| where
# ρ=moon/target separation, z=90°−target altitude and
# zmoon=90°−moon altitude.
# string grid ['Y','N'] flag for inclusion of scattered moonlight.
'incl_moon': 'Y',
# degrees float range [0.0,360.0] Separation of Sun and Moon as
# seen from Earth ("moon phase")
'moon_sun_sep': 90.0,
# degrees float range [0.0,180.0] Moon-Target Separation ( ρ )
'moon_target_sep': 45.0,
# degrees float range [-90.0,90.0] Moon Altitude over Horizon
'moon_alt': 45.0,
# float range [0.91,1.08] Moon-Earth Distance (mean=1)
'moon_earth_dist': 1.0,
# Starlight
# string grid ['Y','N'] flag for inclusion of scattered starlight
'incl_starlight': 'Y',
# Zodiacal light
# string grid ['Y','N'] flag for inclusion of zodiacal light
'incl_zodiacal': 'Y',
# degrees float range [-180.0,180.0] Heliocentric ecliptic
# longitude
'ecl_lon': 135.0,
# degrees float range [-90.0,90.0] Ecliptic latitude
'ecl_lat': 90.0,
# Molecular Emission of Lower Atmosphere
# string grid ['Y','N'] flag for inclusion of lower atmosphere
'incl_loweratm': 'Y',
# Emission Lines of Upper Atmosphere
# string grid ['Y','N'] flag for inclusion of upper stmosphere
'incl_upperatm': 'Y',
# Airglow Continuum (Residual Continuum)
# string grid ['Y','N'] flag for inclusion of airglow
'incl_airglow': 'Y',
# Instrumental Thermal Emission This radiance component represents
# an instrumental effect. The emission is provided relative to the
# other model components. To obtain the correct absolute flux, an
# instrumental response curve must be applied to the resulting
# model spectrum See section 6.2.4 in the documentation
# http://localhost/observing/etc/doc/skycalc/
# The_Cerro_Paranal_Advanced_Sky_Model.pdf
# string grid ['Y','N'] flag for inclusion of instrumental thermal
# radiation
'incl_therm': 'N',
'therm_t1': 0.0, # K float > 0
'therm_e1': 0.0, # float range [0,1]
'therm_t2': 0.0, # K float > 0
'therm_e2': 0.0, # float range [0,1]
'therm_t3': 0.0, # float > 0
'therm_e3': 0.0, # K float range [0,1]
# Wavelength Grid
'vacair': 'vac', # vac or air
'wmin': 300.0, # nm float range [300.0,30000.0] < wmax
'wmax': 2000.0, # nm float range [300.0,30000.0] > wmin
# string grid ['fixed_spectral_resolution','fixed_wavelength_step',
# 'user']
'wgrid_mode': 'fixed_wavelength_step',
# nm/step float range [0,30000.0] wavelength sampling step dlam
# (not the res.element)
'wdelta': 0.1,
# float range [0,1.0e6] RESOLUTION is misleading, it is rather
# lam/dlam where dlam is wavelength step (not the res.element)
'wres': 20000,
'wgrid_user': [500.0, 510.0, 520.0, 530.0, 540.0, 550.0],
# Convolve by Line Spread Function
'lsf_type': 'none', # string grid ['none','Gaussian','Boxcar']
'lsf_gauss_fwhm': 5.0, # wavelength bins float > 0
'lsf_boxcar_fwhm': 5.0, # wavelength bins float > 0
'observatory': 'paranal' # paranal
}
def handle_exception(self, e, msg):
print(msg)
print(e)
print(self.bugreport_text)
if(self.stop_on_errors_and_exceptions):
sys.exit()
# handle the kind of errors we issue ourselves.
def handle_error(self, msg, stop=True):
print(msg)
print(self.bugreport_text)
if(self.stop_on_errors_and_exceptions):
sys.exit()
def retrieve_data(self, url):
try:
response = requests.get(url, stream=True)
self.data = response.content
except requests.exceptions.RequestException as e:
self.handle_exception(
e, 'Exception raised trying to get FITS data from ' + url)
def write(self, local_filename):
try:
with open(local_filename, 'wb') as f:
f.write(self.data)
except IOError as e:
self.handle_exception(
e, 'Exception raised trying to write fits file ')
def getdata(self):
return self.data
def delete_server_tmpdir(self, tmpdir):
try:
response = requests.get(self.deleter_script_url + '?d=' + tmpdir)
deleter_response = response.text.strip()
if(deleter_response != 'ok'):
self.handle_error('Could not delete server tmpdir ' + tmpdir)
except requests.exceptions.RequestException as e:
pass # ignore the exception
def call(self, test=False):
# print 'self.url=',self.url
# print 'self.params=',self.params
try:
response = requests.post(self.url, data=json.dumps(self.params))
except requests.exceptions.RequestException as e:
self.handle_exception(
e, 'Exception raised trying to POST request ' + self.url)
try:
res = json.loads(response.text)
status = res['status']
tmpdir = res['tmpdir']
except (KeyError, ValueError) as e:
self.handle_exception(
e, 'Exception raised trying to decode server response ')
tmpurl = self.server + '/observing/etc/tmp/' + \
tmpdir + '/skytable.fits'
if(status == 'success'):
try:
# retrive and save FITS data (in memory)
self.retrieve_data(tmpurl)
except requests.exceptions.RequestException as e:
self.handle_exception(
e, 'could not retrieve FITS data from server')
self.delete_server_tmpdir(tmpdir)
else: # print why validation failed
self.handle_error('parameter validation error: ' +
res['error'])
if(test):
# print 'call() returning status:',status
return status
def callwith(self, newparams):
for key, val in newparams.items():
if key in self.params: # valid
self.params[key] = val
else: # invalid
print('callwith() ignoring invalid keyword: ', key)
self.call()
def printparams(self, keys=None):
if keys is None:
p = self.params
else:
p = dict((k, self.params[k]) for k in keys if k in self.params)
for k in p:
print(k, p[k])
def test(self, label, overwite_params):
from cStringIO import StringIO
# capture stdout for a while
old_stdout = sys.stdout
sys.stdout = mystdout = StringIO()
sm = SkyModel()
str_params = ''
# replace all matching keywords with the values in the newdict
for key, val in overwite_params.items():
if key in sm.params:
sm.params[key] = val
str_params = str_params + str(key) + ' : ' + str(val) + ', '
status = sm.call(True) # set True for test
# restore original stdout
sys.stdout = old_stdout
toprint = label + '\t'
# print str_params
if(status == 'success'):
toprint += bcolors.OKGREEN + ' *** pass *** ' + bcolors.ENDC
else:
toprint += bcolors.FAIL + ' *** fail *** ' + bcolors.ENDC
print(toprint)
print(mystdout.getvalue())
def reset(self):
self.__init__()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# European Southern Observatory
# For any scientific or technical question,
# please contact usd-help@eso.org
from __future__ import print_function
import pkg_resources
import sys
import json
import getopt
from .skycalc import SkyModel
from .skycalc import AlmanacQuery
def loadTxt(inputFile):
result = {}
lineN = 0
for line in inputFile.readlines():
lineN += 1
originalLine = line
line = line.strip()
if len(line) == 0 or line[0] == '#':
continue
line = line.split(':')
if len(line) > 2 and line[0].strip() == 'date':
line = [line[0], ':'.join(line[1::])]
elif len(line) != 2 or (len(line) == 2 and (line[0].strip() == '' or
line[1].strip() == '')):
print('WARNING: input file line ' + str(lineN) +
': wrong format: ignored.')
print(originalLine.rstrip())
continue
key = line[0].strip()
value = line[1].strip()
# Is it an integer?
try:
value = int(value)
except ValueError:
# Is it an float?
try:
value = float(value)
except ValueError:
pass
result[key] = value
return result
def fixObservatory(indic):
if 'observatory' in indic:
if indic['observatory'] == 'lasilla':
indic['observatory'] = '2400'
elif indic['observatory'] == 'paranal':
indic['observatory'] = '2640'
elif (indic['observatory'] == '3060m' or
indic['observatory'] == 'armazones'):
indic['observatory'] = '3060'
# elif indic['observatory'] == '5000m':
# indic['observatory'] = '5000'
else:
raise ValueError('Wrong Observatory name, please refer to the '
'documentation.')
return indic
def usage():
print('usage: skycalc_cli -i|--in inputfile -o|--out file.fits ' +
'[-a|--alm almanacparameterfile] [-v|--verbose] [--version]')
def main():
argv = sys.argv[1:]
almFilename = None
inputFilename = None
outputFilename = None
isVerbose = False
# Read command line arguments
try:
opts, args = getopt.getopt(argv, "hva:i:o:", ["help", "verbose",
"version",
"alm=", "in=", "out="])
except getopt.GetoptError:
usage()
sys.exit(1)
for opt, arg in opts:
if opt in ("-h", "--help"):
usage()
sys.exit()
elif opt in ("--version"):
version = pkg_resources.require("skycalc_cli")[0].version
print('skycalc_cli version ' + version)
sys.exit(0)
elif opt in ("-v", "--verbose"):
isVerbose = True
elif opt in ("-a", "--alm"):
almFilename = arg
elif opt in ("-i", "--in"):
inputFilename = arg
elif opt in ("-o", "--out"):
outputFilename = arg
if not ((inputFilename or almFilename) and outputFilename):
usage()
sys.exit(1)
dic = {}
# Query the Almanac if alm option is enabled
if almFilename:
# Read the input parameters
inputalmdic = None
try:
with open(almFilename, 'r') as f:
inputalmdic = json.load(f)
except ValueError:
with open(almFilename, 'r') as f:
inputalmdic = loadTxt(f)
if not inputalmdic:
raise ValueError('Error: cannot read' + almFilename)
alm = AlmanacQuery(inputalmdic)
dic = alm.query()
if isVerbose:
print('Data retrieved from the Almanac:')
for key, value in dic.items():
print('\t' + str(key) + ': ' + str(value))
if inputFilename:
# Read input parameters
inputdic = None
try:
with open(inputFilename, 'r') as f:
inputdic = json.load(f)
except ValueError:
with open(inputFilename, 'r') as f:
inputdic = loadTxt(f)
if not inputdic:
raise ValueError('Error: cannot read ' + inputFilename)
# Override input parameters
if isVerbose:
print('Data overridden by the user\'s input file:')
for key, value in inputdic.items():
if isVerbose and key in dic:
print('\t' + str(key) + ': ' + str(value))
dic[key] = value
# Fix the observatory to fit the backend
try:
dic = fixObservatory(dic)
except ValueError:
raise
if isVerbose:
print('Data submitted to SkyCalc:')
for key, value in dic.items():
print('\t' + str(key) + ': ' + str(value))
# Get the Sky
skyModel = SkyModel()
skyModel.callwith(dic)
skyModel.write(outputFilename)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment