Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Convert CIMSS CSPP S-NPP VIIRS-AF output from HDFv5 to CSV
#!/usr/bin/env python
# Author:
# Source URL:
import os, sys, re
from optparse import OptionParser
from datetime import datetime
import logging
import tables
desc_text = """Convert CSPP VIIRS-AF AVAFO HDFv5 output to CSV format.
Unsupported node type causes pytables UserWarning which is safe to ignore
e.g. /Data_Products/VIIRS-AF-EDR/VIIRS-AF-EDR_Gran_0"""
usage_text = """usage: %prog [options] AVAFO_npp_hdf5 [...]"""
def processFile(hdf, fire_datetime, outdir=None, outfile=None, dryrun=False):
if not outfile:
outfile = re.sub('\.h5$', '.txt', os.path.basename(hdf), re.I)
outfile = fire_datetime.strftime(outfile)
if not outdir or not os.path.isdir(os.path.dirname(outdir)):
outdir = os.path.dirname(hdf)
outdir = fire_datetime.strftime(outdir)
outpath = os.path.join(outdir, outfile)
with tables.openFile(hdf, "r") as f:
# Get the path of wanted nodes
# e.g.
# /All_Data/VIIRS-AF-EDR_All/Latitude/Latitude_0
# /All_Data/VIIRS-AF-EDR_All/Longitude/Longitude_0
paths = {
'Latitude': None,
'Longitude': None,
'QF4': None # Fire detection confidence
for n in f.walkNodes(classname='EArray'):
# Unsupported node type causes pytables UserWarning which is safe to ignore
# e.g. /Data_Products/VIIRS-AF-EDR/VIIRS-AF-EDR_Gran_0
for kw in paths:
if kw in n._v_pathname:
if paths[kw]: logging.warning("Seen '%s' multiple times in walkNodes (%s): %s %s" % (kw, hdf, paths[kw], n._v_pathname))
paths[kw] = n._v_pathname
not_found = filter(lambda kw: paths[kw] is None, paths)
if not_found:
logging.error("Can't get node paths for %s in %s" % (not_found, hdf))
return False
lats = f.getNode(paths['Latitude'])
lons = f.getNode(paths['Longitude'])
confidence = f.getNode(paths['QF4'])
numrec = len(lats)
logging.debug("Found %d records in %s" % (numrec, hdf))
if numrec < 1: return False
if dryrun is True:
outfh = sys.stdout
outfh = open(outpath, 'w+')
# Output format
# lat,lon,t13_k,scan,track,confidence,frp_mw
# 'real','real','real','real','real','integer','real'
for i in range(numrec):
outfh.write("%s,%s,-1,-1,-1,%s,-1\n" % (lats[i], lons[i], confidence[i]))
if not dryrun: outfh.close()"Detected %d records at %s in %s" % (numrec, fire_datetime, outpath))
return True
def main(argv=None):
if argv is None:
argv = sys.argv
debuglevelD = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL,
defvals = {
parser = OptionParser(usage=usage_text, description=desc_text)
parser.add_option("--outdir", dest="outdir", type="string", \
help="Output directory. Default same as input.", metavar="DIR")
parser.add_option("--outfile", dest="outfile", type="string", \
help="Output filename specification. Default same as input with .txt")
parser.add_option("-n", "--dryrun", dest="dryrun", action="store_true", \
help="Dry-run without creating output file", metavar='BOOL')
parser.add_option("-l", "--loglevel", dest="loglevel", type="string", \
help="Verbosity %s"%debuglevelD.keys(), metavar='LOGLEVEL')
(options, args) = parser.parse_args()
if options.loglevel:
if options.loglevel not in debuglevelD: raise AssertionError("Log level must be one of: %s"%debuglevelD.keys())
dbglvl = debuglevelD[options.loglevel]
dbglvl = logging.WARNING
logger = logging.getLogger()
ch = logging.StreamHandler()
ch.setFormatter( logging.Formatter('%(asctime)s %(lineno)d %(name)s %(funcName)s - %(levelname)s - %(message)s') )
if len(args) < 1:
parser.error("Requires one or more AVAFO input file")
for h5 in args:
if os.path.isfile(h5) is False:
logger.warning("%s is not a file" % h5)
# Extract fire timestamp in GMT from filename
# AVAFO_npp_d{YYYYMMDD}_t{HHMMSS}_e{HHMMSS}_b00001_c20130124134249021000_cspp_dev.h5
# AVAFO_npp_d20130218_t1206274_e1207515_b00001_c20130218124844523582_cspp_dev.h5
f = os.path.basename(h5)
#m ='_d(\d{8})_', f, re.I)
#if m is None: raise ValueError("No _dYYYYMMDD_ in '%s'" %(f))
#yyyymmdd =
#m ='_t(\d{4})\d{3}_', f, re.I)
#if m is None: raise ValueError("No _tHHMMXXX_ in '%s'" %(f))
#hhmm =
#fire_datetime = datetime.strptime('%s %s'%(yyyymmdd, hhmm), '%Y%m%d %H%M')
m ='_c(?P<yyyymmdd>\d{8})(?P<hhmm>\d{4})\d+', f, re.I)
if m is None: raise ValueError("No _c{yyyymmdd}{hhmm} in '%s'"%f)
fire_datetime = datetime.strptime('%(yyyymmdd)s %(hhmm)s'%m.groupdict(), '%Y%m%d %H%M')
processFile(h5, fire_datetime,
except Exception, err:
raise ValueError("Failed to process '%s': %s" % (f, err))
return 0
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment