Skip to content

Instantly share code, notes, and snippets.

@egoddard
Created July 13, 2013 04:11
Show Gist options
  • Save egoddard/5989382 to your computer and use it in GitHub Desktop.
Save egoddard/5989382 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
############################################################################
#
# MODULE: WV2_Radiometric_Correction_MSS.py
#
# AUTHOR(S): Eric Goddard, egoddard@memphis.edu
#
# PURPOSE: Converts WV-2 multispectral bands from DNs to top of
# atmosphere reflectance.
#
# COPYRIGHT: (C) 2013 Eric Goddard
#
# REFERENCES:
# 2010 Updike Todd and Chris Comp. Radiometric Use of WorldView-2
# Imagery. DigitalGlobe. Electronic Document,
# http://www.digitalglobe.com/sites/default/files/Radiometric_Use_of_WorldView-2_Imagery%20%281%29.pdf.
# Accessed 12 July 2013.
#
#############################################################################
import sys
import os
#import multiprocessing
import time
from collections import defaultdict
from datetime import datetime
import math
import grass.script as grass
try:
from lxml import etree
except ImportError:
try:
import cElementTree as etree
except ImportError:
try:
import elementtree.ElementTree as etree
except ImportError:
print "Failed to import ElementTree from any known source."
sys.exit(1)
if "GISBASE" not in os.environ:
print "You must b in GRASS GIS to run this program."
sys.exit(1)
def AssociateMetaDataWithRaster(rastList):
"""Creates a dictionary that consists of the raster name as the
key and the associated metadata file as the value by removing
band numbers and the tile information from the raster name."""
rasterMetaDict = {}
for rast in rastList:
#If a file begins with a number, GRASS replaces it with an 'x' when
#importing. All of my imagery was taken in Oct/Nov so the x is
#replaced with a 1.
metaName = "1" + rast[0][1:18] + rast[0][23:-2] + ".XML"
rasterMetaDict[rast[0]] = metaName.replace('_', '-', 2)
return rasterMetaDict
def EarthSunDistance(date):
"""Take a string date as input and converts it to the Julian day and
returns the Earth-Sun Distance for that day."""
acqTime = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ")
if acqTime.month == 1 or acqTime.month == 2:
month = acqTime.month + 12
year = acqTime.year - 1
else:
month = acqTime.month
year = acqTime.year
secondsMicro = acqTime.second + (acqTime.microsecond / 1000000.0)
UT = acqTime.hour + (acqTime.minute / 60.0) + (secondsMicro / 3600.0)
A = int(year / 100)
B = 2 - A + int(A / 4)
JD = int(365.25 * (year + 4716)) + \
int(30.6001 * (month + 1)) + acqTime.day + (UT / 24.0) + \
B - 1524.5
D = JD - 2451545.0
g = math.radians(357.529 + 0.98560028 * D)
return 1.00014 - 0.01671 * math.cos(g) - 0.00014 * math.cos(2 * g)
def ExtractVariablesFromMetadata(metadataFilePath, bandnum):
"""Takes the path to the XML file for the WorldView-2 image and the band
number and returns the required variables for the DN to Reflectance
calculation as a list.
The returned variables and their list indices:
0: Absolute Calibration Factor
1: Solar Zenith Angle
2: Earth-Sun Distance
3: Band Averaged Solar Spectral Irradiance
4: Effective Bandwidth
"""
with open(metadataFilePath, 'rb') as metadataFile:
root = etree.parse(metadataFile).getroot()
bandList = {1:"BAND_C", 2:"BAND_B", 3:"BAND_G", 4:"BAND_Y", 5:"BAND_R",
6:"BAND_RE", 7:"BAND_N", 8:"BAND_N2"}
if root.find("IMD/%s" % bandList[bandnum]) is not None:
absCalFactor = root.find("IMD/%s/ABSCALFACTOR" % bandList[bandnum]).text
effectiveBandWidth = root.find("IMD/%s/EFFECTIVEBANDWIDTH" % bandList[bandnum]).text
else:
print "Error: Did not find absCalFactor element."
if root.find("IMD/IMAGE/MEANSUNEL") is not None:
sunElevation = root.find("IMD/IMAGE/MEANSUNEL").text
else:
print "Error: Did not find sunElevation element"
if root.find("IMD/MAP_PROJECTED_PRODUCT/EARLIESTACQTIME") is not None:
acqTime = root.find("IMD/MAP_PROJECTED_PRODUCT/EARLIESTACQTIME").text
earthSunDist = EarthSunDistance(acqTime)
else:
print "Error: Did not find Acquisition Time Element"
#WV-2 Band-Averaged Solar Spectral Irradiance Table
SSI = {0: 1580.8140, #Pan
1: 1758.2229,
2: 1974.2416,
3: 1856.4104,
4: 1738.4791,
5: 1559.4555,
6: 1342.0695,
7: 1069.7302,
8: 861.2866}
return [float(absCalFactor), 90 - float(sunElevation), float(earthSunDist), SSI[bandnum], float(effectiveBandWidth)]
def CalculateTOAR(raster, bandParams):
"""Starts eight mapcalc operations, one for each band in the input raster's
region. bandParams is a list of lists; each inner list contains the raster
band number at the first index followed by the 5 variables returned from the
ExtractVariablesFromMetadata function.
The bandParams inner list indices refer to:
0: band number
1: Absolute Calibration Factor
2: Solar Zenith Angle
3: Earth-Sun Distance
4: Band Averaged Solar Spectral Irradiance
5: Effective Bandwidth"""
print "Changing Region settings to %s.1..." % raster
grass.run_command('g.region', flags='p', rast="%s.1" % raster)
grass.use_temp_region()
calc = "$output = ((($absCal * $input_rast) / $efb) * $esd^2 * $pi) / ($eSun * cos($theta))"
grass.message("Calculating Top of Atmosphere Reflectance for %s" % raster)
p1 = grass.mapcalc_start(calc, output="%s_TOAR.%i" % (raster, bandParams[0][0]),
absCal=bandParams[0][1], input_rast="%s.%i" % (raster, bandParams[0][0]),
esd=bandParams[0][3], pi=math.pi, eSun=bandParams[0][4],
theta=bandParams[0][2], efb=bandParams[0][5])
p2 = grass.mapcalc_start(calc, output="%s_TOAR.%i" % (raster, bandParams[1][0]),
absCal=bandParams[1][1], input_rast="%s.%i" % (raster, bandParams[1][0]),
esd=bandParams[1][3], pi=math.pi, eSun=bandParams[1][4],
theta=bandParams[1][2], efb=bandParams[1][5])
p3 = grass.mapcalc_start(calc, output="%s_TOAR.%i" % (raster, bandParams[2][0]),
absCal=bandParams[2][1], input_rast="%s.%i" % (raster, bandParams[2][0]),
esd=bandParams[2][3], pi=math.pi, eSun=bandParams[2][4],
theta=bandParams[2][2], efb=bandParams[2][5])
p4 = grass.mapcalc_start(calc, output="%s_TOAR.%i" % (raster, bandParams[3][0]),
absCal=bandParams[3][1], input_rast="%s.%i" % (raster, bandParams[3][0]),
esd=bandParams[3][3], pi=math.pi, eSun=bandParams[3][4],
theta=bandParams[3][2], efb=bandParams[3][5])
p5 = grass.mapcalc_start(calc, output="%s_TOAR.%i" % (raster, bandParams[4][0]),
absCal=bandParams[4][1], input_rast="%s.%i" % (raster, bandParams[4][0]),
esd=bandParams[4][3], pi=math.pi, eSun=bandParams[4][4],
theta=bandParams[4][2], efb=bandParams[4][5])
p6 = grass.mapcalc_start(calc, output="%s_TOAR.%i" % (raster, bandParams[5][0]),
absCal=bandParams[5][1], input_rast="%s.%i" % (raster, bandParams[5][0]),
esd=bandParams[5][3], pi=math.pi, eSun=bandParams[5][4],
theta=bandParams[5][2], efb=bandParams[5][5])
p7 = grass.mapcalc_start(calc, output="%s_TOAR.%i" % (raster, bandParams[6][0]),
absCal=bandParams[6][1], input_rast="%s.%i" % (raster, bandParams[6][0]),
esd=bandParams[6][3], pi=math.pi, eSun=bandParams[6][4],
theta=bandParams[6][2], efb=bandParams[6][5])
p8 = grass.mapcalc_start(calc, output="%s_TOAR.%i" % (raster, bandParams[7][0]),
absCal=bandParams[7][1], input_rast="%s.%i" % (raster, bandParams[7][0]),
esd=bandParams[7][3], pi=math.pi, eSun=bandParams[7][4],
theta=bandParams[7][2], efb=bandParams[7][5])
p1.wait()
p2.wait()
p3.wait()
p4.wait()
p5.wait()
p6.wait()
p7.wait()
p8.wait()
def main():
#Path to folder containing WV-2 imagery and metadata XML files
imageryPath = r"/path/to/imagery/folder"
rasterList = grass.mlist_pairs('rast', pattern='*M3DS*', mapset="PERMANENT")
metadataDict = AssociateMetaDataWithRaster(rasterList)
rasterDict = defaultdict(list)
for key in metadataDict:
grass.message("Extracting variables for %s" % key)
#Pass image path and band number to ExtractVariablesFromMetadata function
variables = ExtractVariablesFromMetadata(os.path.join(imageryPath,
metadataDict[key]), int(key[-1]))
#assign extracted variables to a new dictionary that consists of the
#image name minus the band number as key that has a list containing
#the band number and variables as its value for passing to the
#CalculateTOAR function.
rasterDict[key[:-2]].append([int(key[-1])] + variables)
for key in rasterDict:
CalculateTOAR(key, rasterDict[key])
if __name__ == '__main__':
#Start multiprocessing of radiometric corrections
t0 = time.time()/60
print "Start"
main()
print "Finish"
print (time.time()/60) - t0, "minutes processing time"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment