Skip to content

Instantly share code, notes, and snippets.

@cynici
Created December 7, 2012 09:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cynici/4232151 to your computer and use it in GitHub Desktop.
Save cynici/4232151 to your computer and use it in GitHub Desktop.
NPP: List best-matching ancillary file required by DRL algorithm
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import sys, os, logging, re
from optparse import OptionParser
from datetime import datetime, timedelta
from glob import iglob as glob
help_text = """%prog [options] -t TYPE npp_dat_file
DRL algorithms requires some ancillary files which are closest to,
but prior, to the date of the npp_dat_file, e.g.
drl.tle.yyyymmdd
off_USNO-PolarWander-IT1-ANC_Ser7_USNO_000f_yyyymmdd_*
{Sensor}-SDR_BE_LUTs_yymmdd.tar.gz"""
# NPP raw filename format made by EOS FES
datetime_re = re.compile('\D(?P<yyyy>\d{4})(?P<mm>\d{2})(?P<dd>\d{2})\D(?P<HH>\d{2})(?P<MM>\d{2})(?P<SS>\d{2})\D')
ftypes = {
"tle": {
"fnformat": "drl.tle.%Y%m%d*",
"get_datetime": lambda f: datetime.strptime(f, "drl.tle.%Y%m%d%H"),
"dir": "$HOME/drl/ancillary/tle",
"url": "ftp://is.sci.gsfc.nasa.gov/ancillary/ephemeris/tle/",
"range": range(0,15),
},
"leapsec": {
"fnformat": "leapsec.%Y%m%d*.dat",
"get_datetime": lambda f: datetime.strptime(f, "leapsec.%Y%m%d%H.dat"),
"dir": "$HOME/drl/ancillary/temporal",
"url": "ftp://is.sci.gsfc.nasa.gov/ancillary/temporal/",
"range": range(0,15),
},
"polar": {
"fnformat": "off_USNO-PolarWander-UT1-ANC_Ser7_USNO_000f_%Y%m%d_*",
"dir": "$HOME/drl/ancillary/temporal",
"url": "ftp://is.sci.gsfc.nasa.gov/ancillary/temporal/",
"range": range(0,31),
},
"viirslut": {
"fnformat": "VIIRS-SDR_LUTs_%y%m%d.tar.gz",
"dir": "$HOME/drl/ancillary/luts",
"url": "ftp://is.sci.gsfc.nasa.gov/ancillary/LUTs/npp/viirs/",
},
"viirsbelut": {
"fnformat": "VIIRS-SDR_BE_LUTs_%y%m%d.tar.gz",
"dir": "$HOME/drl/ancillary/luts",
"url": "ftp://is.sci.gsfc.nasa.gov/ancillary/LUTs/npp/viirs/",
"range": range(1,200),
},
"atmslut": {
"fnformat": "ATMS-SDR_BE_LUTs_%y%m%d.tar.gz",
"dir": "$HOME/drl/ancillary/luts",
"url": "ftp://is.sci.gsfc.nasa.gov/ancillary/LUTs/npp/atms/",
},
"crislut": {
"fnformat": "CRIS-SDR_BE_LUTs_%y%m%d.tar.gz",
"dir": "$HOME/drl/ancillary/luts",
"url": "ftp://is.sci.gsfc.nasa.gov/ancillary/LUTs/npp/cris/",
},
}
def main(argv=None):
if argv is None:
argv = sys.argv
debuglevelD = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL,
}
defvals = {
}
parser = OptionParser(help_text)
parser.add_option("-t", dest="ftype", type="string",
help="Ancillary file type. Options: %s"%(sorted(ftypes.keys())), metavar='TYPE')
parser.add_option("--dir", dest="dir", type="string",
help="Local directory for specific type of ancillary file.", metavar='DIR')
parser.add_option("-l", "--loglevel", dest="loglevel", type="string",
help="Verbosity %s"%debuglevelD.keys(), metavar='LOGLEVEL')
parser.set_defaults(**defvals)
(options, args) = parser.parse_args()
if options.loglevel:
if options.loglevel not in debuglevelD: parser.error("Log level must be one of: %s"%debuglevelD.keys())
dbglvl = debuglevelD[options.loglevel]
else:
dbglvl = logging.WARNING
logger = logging.getLogger()
logger.setLevel(dbglvl)
ch = logging.StreamHandler()
ch.setFormatter( logging.Formatter('%(asctime)s %(lineno)d %(name)s %(funcName)s - %(levelname)s - %(message)s') )
ch.setLevel(dbglvl)
logger.addHandler(ch)
if options.ftype not in ftypes:
parser.error("Missing or invalid ancillary file type.\nOptions:%s" % sorted(ftypes.keys()))
if len(args) != 1:
parser.error("Requires exactly one NPP dat filename.")
type_dict = ftypes[options.ftype]
m = datetime_re.search(os.path.basename(args[0]))
if m:
overpass_dt = datetime.strptime("%(yyyy)s-%(mm)s-%(dd)s %(HH)s:%(MM)s:%(SS)s"%(m.groupdict()), "%Y-%m-%d %H:%M:%S")
else:
parser.error("Failed to extract datetime from given NPP filename: %s" % args[0])
if options.dir:
dir = os.path.expandvars(options.dir)
else:
dir = os.path.expandvars(type_dict["dir"])
if os.path.isdir(dir):
os.chdir(dir)
logger.debug("Local directory: %s" % dir)
if "range" in type_dict and type_dict["range"]:
daterange = type_dict["range"]
else:
daterange = range(1, 50) # arbitrary limit
for d in daterange:
file_dt = overpass_dt - timedelta(days=d)
fnpattern = file_dt.strftime(type_dict["fnformat"])
for file in glob(fnpattern):
if "get_filedatetime" in type_dict and type_dict["get_filedatetime"]:
file_dt = type_dict["get_filedatetime"](file)
if file_dt < overpass_dt:
print os.path.join(dir, file)
return 0
print >>sys.stderr, "No match. Download %s closest but prior to %s from:\n%s\nto:\n%s" % \
(type_dict["fnformat"], overpass_dt.strftime("%Y-%m-%d"), type_dict["url"], type_dict["dir"])
return 1
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment