Skip to content

Instantly share code, notes, and snippets.

@Nullpo1nt
Created June 24, 2020 23:14
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Nullpo1nt/70109f56fa47fd7e0fb924ac1bbbd508 to your computer and use it in GitHub Desktop.
Save Nullpo1nt/70109f56fa47fd7e0fb924ac1bbbd508 to your computer and use it in GitHub Desktop.
Parses NOAA TAF reports into basic structure.
import re
import ftplib
import sys
cloudType = {"CB": "Cumulonimbus"}
skyCondition = {"SKC": "Sky clear", "FEW": "Few", "SCT": "Scattered", "BKN": "Broken", "OVC": "Overcast"}
intensity = {"-": "Light", " ": "Moderate", "+": "Heavy", "VC": "Vicinity"}
qualifier = {"MI": "Shallow", "BC": "Patches", "DR": "Low Drifting", "BL": "Blowing", "SH": "Showers", "TS": "Thunderstorm", "FZ": "Freezing", "PR": "Partial"}
precipitation = {"DZ": "Drizzle", "RA": "Rain", "SN": "Snow", "SG": "Snow Grains", "IC": "Ice Crystals", "PL": "Ice Pellets", "GR": "Hail",
"GS": "Small Hail or Snow Pellets", # (less than 1/4 inch in diameter)",
"UP": "Unknown precipitation"}
obscuration = {"BR": "Mist", # (Foggy conditions with visibilities greater than 5/8 statute mile)",
"FG": "Fog", # (visibility 5/8 statute mile or less)",
"FU": "Smoke", "DU": "Dust", "SA": "Sand", "HZ": "Haze", "PY": "Spray", "VA": "Volcanic Ash"}
other = {"PO": "Well-Developed Dust/Sand Whirls", "SQ": "Squalls", "FC": "Funnel Cloud", "SS": "Sandstorm", "DS": "Duststorm"}
###########################################################
#
###########################################################
def getDateTime(s):
#if s =~ /(\d{2})(\d{2})(\d{2})/):
# s = "$2:00Z to $3:00Z"
#elif s =~ /(\d{2})(\d{2})\/(\d{2})(\d{2})/:
# s = "$2:00Z to $4:00Z"
#elif s =~ /(\d{2})(\d{2})/:
# s = "$1:00Z to $2:00Z"
return s
###########################################################
#
###########################################################
class Base(object):
def toString(self):
return "?Base{}?"
def parse(self, s):
pass
def filterSplit(self, list):
"Remove empty (or white chars only) elements from a list"
result = []
for item in list:
if not(re.match('^\s*$', item)):
result.append(item)
return result
def normalize(self, s):
# normalize
s = re.sub('=.*', '', s)
s = re.sub('^\s+|\s+$', '', s)
s = re.sub('\s+', ' ', s)
s = s.upper()
# Remove anything before the first "TAF" and the first
# "=" or end of string
#s = re.sub('^.*(TAF.*?=?)$', '\1', s)
return s
###########################################################
#
###########################################################
class Values(Base):
def __init__(self, values=None):
if values == None:
values = []
self.values = values
def addValue(self, value):
self.values.append(value)
def getValues(self):
return self.values
def getValue(self, pos):
if pos < len(self.values) and pos >= 0:
return self.values[pos]
return None
def toString(self):
s = []
for i in self.values:
s.append("value=")
s.append("i")
s.append(", ")
return ''.join(s)
###########################################################
#
###########################################################
class TAFReport(Base):
"TAFReport = 'TAF' TAF SP 1*RAF SP '='"
def __init__(self, taf=None, rafs=None):
if taf == None:
taf = TAF()
self.taf = taf
if rafs == None:
rafs = []
self.rafs = rafs
def parse(self, s):
"Parses a TAF string and returns a TAF object."
taf = self.normalize(s)
# Split into blocks based on group identifiers
reports = re.split('\s*(TEMPO|FM\d+|PROB\d+|BECMG)', taf)
r = []
for item in reports:
if re.search('^\s+', item):
item = r.pop() + item
r.append(item)
for item in r:
if re.match('^TAF\s+', item):
self.taf.parse(item)
elif re.match('^TEMPO\s+', item):
temporary = Temporary()
temporary.parse(item)
self.rafs.append(temporary)
elif re.match('^FM\d+\s+', item):
fm = From()
fm.parse(item)
self.rafs.append(fm)
elif re.match('^PROB\d+\s+', item):
prob = Probability()
prob.parse(item)
self.rafs.append(prob)
elif re.match('^BECMG\s+', item):
becoming = Becoming()
becoming.parse(item)
self.rafs.append(becoming)
else:
print("\tUnknown format: ", item)
def toString(self):
s = "TAFReport{\ntaf=" + self.taf.toString()
for item in self.rafs:
s += "\nrafs[]=" + item.toString()
return s + "\n}";
###########################################################
#
###########################################################
class TAF(Base):
"TAF = [SP 'AMD'] SP ICAO SP IssuedTime SP ValidRange SP Forecast"
def __init__(self, icao="", ammended="", issueTime="", validTimeFrom="", validTimeTo="", forecast=None):
self.icao = icao
self.ammended = ammended
self.issueTime = issueTime
self.validTimeFrom = validTimeFrom
self.validTimeTo = validTimeTo
if (forecast == None):
forecast = Forecast()
self.forecast = forecast
def parse(self, s):
r = re.split('(TAF\s+.*?\s+\d+(?:\/\d+)?)\s+', s)
r = self.filterSplit(r);
if len(r) == 2:
taf = r[0];
match = re.search('TAF\s+(?:(AMD)\s+)?(\w{4})\s+(\d{2})(\d{2})(\d{2})Z\s+(\d{2})(\d{2})(\d{2})', taf)
if match:
self.icao = match.group(2)
if match.group(1):
self.ammended = "AMD"
self.issueTime = match.group(3)+" "+match.group(4)+":"+match.group(5)
self.validTimeFrom = match.group(6)+" "+match.group(7)+":00"
self.validTimeTo = (match.group(6)+1)+" "+match.group(8)+":00"
match = re.search('TAF\s+(?:(AMD)\s+)?(\w{4})\s+(\d{2})(\d{2})(\d{2})Z\s+(\d{2})(\d{2})\/(\d{2})(\d{2})', taf)
if match:
self.icao = match.group(2)
if match.group(1):
self.ammended = "AMD"
self.issueTime = match.group(3)+" "+match.group(4)+":"+match.group(5)
self.validTimeFrom = match.group(6)+" "+match.group(7)+":00"
self.validTimeTo = match.group(8)+" "+match.group(9)+":00"
self.forecast.parse(r[1])
def toString(self):
s = "TAF{\nicao="+self.icao
s += ", \nammended="+self.ammended
s += ", \nissueTime="+self.issueTime
s += ", \nvalidTimeFrom="+self.validTimeFrom
s += ", \nvalidTimeTo="+self.validTimeTo
s += ", \nforecast="+self.forecast.toString()
return s + "\n}"
###########################################################
#
###########################################################
class RAF(Base):
"RAF = (Probability | From | Temporary | Becoming) SP Forecast"
def __init__(self, fromTime="", toTime="", forecast=None):
self.fromTime = fromTime
self.toTime = toTime
if forecast == None:
forecast = Forecast()
self.forecast = forecast
def parse(self, s):
pass
def toString(self):
return "RAF{}"
###########################################################
#
###########################################################
class Probability(RAF):
"Probability = 'PROB' 2*2 DIGIT SP 4*4DIGIT"
def __init__(self, probability=-1):
self.probability = probability
def parse(self, s):
r = re.split('(PROB\d{2}\s+\d+(?:\/\d+)?)\s+', s)
r = self.filterSplit(r)
if len(r) == 2:
match = re.search('PROB(\d+)\s+(\d+(?:\/\d+)?)', r[0])
if match:
self.probability = match.group(1)
self.fromTime = match.group(2)
self.forecast = Forecast.parse(r[1])
def toString(self):
#print("\t", getDateTime(match.group(2)), ", $1% chance\n")
s = "Probability{\nprobability="+self.probability
s += ", \nfromTime="+self.fromTime
s += ", \nforecast="+self.forecast.toString()
return s + "\n}"
###########################################################
#
###########################################################
class From(RAF):
"From = 'FM' 4*4DIGIT"
def parse(self, s):
r = re.split('(FM\d+\s+)', s)
r = self.filterSplit(r)
if len(r) == 2:
match = re.search('FM(\d+)', r[0])
if match:
self.fromTime = match.group(1)
self.forecast.parse(r[1])
def toString(self):
#print("\t", getDateTime(mathc.group(1)), ", rapidly becoming\n")
s = "From{\nfromTime="+self.fromTime
s += ", \nforecast="+self.forecast.toString()
return s + "\n}"
###########################################################
#
###########################################################
class Temporary(RAF):
"Temporary = 'TEMPO' SP 4*4DIGIT"
def parse(self, s):
r = re.split('(TEMPO\s+\d+(?:\/\d+)?)\s+', s)
r = self.filterSplit(r)
if len(r) == 2:
match = re.match('TEMPO\s+(\d+(?:\/\d+)?)', r[0])
if match:
self.fromTime = match.group(1)
self.forecast.parse(r[1])
def toString(self):
#print("\t", getDateTime(match.group(1)), ", occasional\n")
s = "Temporary{\nfromTime="+self.fromTime
s += ", \nforecast="+self.forecast.toString()
return s + "\n}"
###########################################################
#
###########################################################
class Becoming(RAF):
"Becoming = 'BECMG' SP 4*4DIGIT"
def parse(self, s):
r = re.split('(BECMG\s+\d+(?:\/\d+)?)\s+', s)
r = self.filterSplit(r)
if len(r) == 2:
match = re.match('BECMG\s+(\d+(?:\/\d+)?)', r[0])
if match:
self.fromTime = match.group(1)
self.forecast = Forecast.parse(r[1])
def toString(self):
#print("\t", getDateTime(match.group(1)), ", becoming\n")
s = "Becoming{\nfromTime="+self.fromTime
s += ", \nforecast="+self.forecast.toString()
return s + "\n}"
###########################################################
#
###########################################################
class Forecast(Base):
"Forecast = Wind [SP Visibility] 0*(SP Weather) 0*(SP Condition) [SP WindShear]"
def __init__(self, wind=None, visibility=None, weather=None, condition=None, windshear=None):
if wind == None:
wind = Wind()
self.wind = wind
if visibility == None:
visibility = Visibility()
self.visibility = visibility
if weather == None:
weather = []
self.weather = weather
if condition == None:
condition = []
self.condition = condition
if windshear == None:
windshear = WindShear()
self.windshear = windshear
def parse(self, s):
forecast = re.split('\s*((?:P6|[0-6]|(?:\d\s+)?\d\/\d)SM|(?:(?:FEW|SCT|BKN|OVC)\d{3}(?:CB)?|SKC)|WS\d+\/\d+KT|(?:-|\+|VC)?(?:MI|BC|DR|BL|SH|TS|FZ|PR|DZ|RA|SN|SG|IC|PL|GR|GS|UP|BR|FG|FU|DU|SA|HZ|PY|VA|PO|SQ|FC|SS|DS|NSW)+)\s*', s)
forecast = self.filterSplit(forecast)
for item in forecast:
match = re.match('^(\d{3}|VRB)(\d+)(?:G(\d+))?KT', item)
if match:
self.wind = Wind(match.group(1), match.group(2))
if match.group(3):
self.wind.gusting = match.group(3)
continue
match = re.match('^(P6|(?:[0-6]\s+)?\d\/\d|[0-6])SM', item)
if match:
self.visibility = Visibility(match.group(1))
continue
match = re.match('^(SKC)|(FEW|SCT|BKN|OVC)(\d{3})(CB)?', item)
if match:
if match.group(1):
condition = Condition(match.group(1))
self.condition.append(condition)
else:
condition = Condition(match.group(2), match.group(3))
if match.group(4):
condition.cb = match.group(4)
self.condition.append(condition)
continue
match = re.match('^(-|\+|VC)?(MI|BC|DR|BL|SH|TS|FZ|PR)?((?:DZ|RA|SN|SG|IC|PL|GR|GS|UP)*)(BR|FG|FU|DU|SA|HZ|PY|VA)?(PO|SQ|FC|SS|DS)?(NSW)?$', item)
if match:
if match.group(6):
self.weather.append(NSW())
elif match.group(1) or match.group(2) or match.group(3) or match.group(4) or match.group(5):
w = Weather()
self.weather.append(w)
if match.group(1):
w.intensity.value = match.group(1)
if match.group(2):
w.descriptor.value = match.group(2)
if match.group(3):
r = re.split('(\w{2})', match.group(3))
print("precip len: ", len(r))
for c in r:
if len(c) > 0:
print(c)
w.percipitation.addValue(c)
if match.group(4):
w.obscuration.value = match.group(4)
if match.group(5):
w.other.value = match.group(5)
continue
match = re.match('^WS(\d+)\/(\d{3})(\d+)KT', item)
if match:
self.windshear = WindShear(match.group(2), match.group(3), match.group(1))
continue
print(">>>>>>>>> Forecast error, unkown format: \"$_\" <<<<<<<<<<<")
def toString(self):
s = "Forecast{\nwind="+self.wind.toString()
s += ", \nvisibility="+self.visibility.toString()
for i in self.weather:
s += ", \nweather[]=" + i.toString()
for i in self.condition:
s += ", \ncondition[]=" + i.toString()
s += ", \nwindshear="+self.windshear.toString()
return s + "\n}"
###########################################################
#
###########################################################
class Wind:
"Wind = Direction Speed ['G' Speed] 'KT'"
def __init__(self, heading="", speed="", gusting=""):
self.heading = heading
self.speed = speed
self.gusting = gusting
def toString(self):
s = "Wind{heading=" + self.heading
s += ", speed=" + self.speed
s += ", gusting=" + self.gusting
return s + "}"
###########################################################
#
###########################################################
class Visibility:
"Visibility = (DIGIT SP DIGIT '/' DIGIT | DIGIT '/' DIGIT | DIGIT | 'P6') 'SM'"
def __init__(self, distance=""):
self.distance = distance
def toString(self):
#if match.group(1):
# print("greater than 6")
#else:
# print("$2")
#print(" SM")
return "Visibility{distance="+self.distance+"}"
###########################################################
#
###########################################################
class Condition:
"Condition = ('FEW' | 'SCT' | 'BKN' | 'OVC') Altitude *1('CB') | 'SKC'"
def __init__(self, condition="", altitude="", cb=""):
self.condition = condition
self.altitude = altitude
self.cb = cb
def toString(self):
# if (match.group(1)):
# print(skyCondition[match.group(1)])
# else:
# print(skyCondition[match.group(2)], " ")
# if match.group(4):
# print(cloudType[match.group(4)], " ")
# print((match.group(3)*100), " FT")
s = "Condition{condition=" + self.condition
s += ", altitude=" + self.altitude
s += ", cb=" + self.cb
return s + "}"
class WeatherType(Base):
pass
###########################################################
#
###########################################################
class Weather(WeatherType):
"Weather = *1Intensity *1Descriptor *1Percipitation *1Obscuration *1Other"
def __init__(self, intensity=None, descriptor=None, percipitation=None, obscuration=None, other=None):
if intensity == None:
intensity = Intensity()
self.intensity = intensity
if descriptor == None:
descriptor = Descriptor()
self.descriptor = descriptor
if percipitation == None:
percipitation = Percipitation()
self.percipitation = percipitation
if obscuration == None:
obscuration = Obscuration()
self.obscuration = obscuration
if other == None:
other = Other()
self.other = other
def toString(self):
s = "Weather{\n\tintensity="+self.intensity.toString()
s += ", \n\tdescriptor=" + self.descriptor.toString()
s += ", \n\tpercipitation=" + self.percipitation.toString()
s += ", \n\tobscuration=" + self.obscuration.toString()
s += ", \n\tother=" + self.other.toString()
return s + "\n}"
###########################################################
#
###########################################################
class NSW(WeatherType):
def toString(self):
return "NSW{}"
###########################################################
#
###########################################################
class Intensity:
"Intensity = '-' | '+' | 'VC'"
def __init__(self, value=""):
self.value = value
def toString(self):
return "Intensity{value="+self.value+"}"
###########################################################
#
###########################################################
class Descriptor:
"Descriptor = 'MI' | 'BC' | 'DR' | 'BL' | 'SH' | 'TS' | 'FZ' | 'PR'"
def __init__(self, value=""):
self.value = value
def toString(self):
return "Descriptor{value="+self.value+"}"
###########################################################
#
###########################################################
class Percipitation(Values):
"Percipitation = 'DZ' | 'RA' | 'SN' | 'SG' | 'IC' | 'PL' | 'GR' | 'GS' | 'UP'"
def toString(self):
return ''.join(["Percipitation{", Values.toString(self), "}"])
###########################################################
#
###########################################################
class Obscuration:
"Obscuration = 'BR' | 'FG' | 'FU' | 'DU' | 'SA' | 'HZ' | 'PY' | 'VA'"
def __init__(self, value=""):
self.value = value
def toString(self):
return "Obscuration{value="+self.value+"}"
###########################################################
#
###########################################################
class Other:
"Other = 'PO' | 'SQ' | 'FC' | 'SS' | 'DS'"
def __init__(self, value=""):
self.value = value
def toString(self):
return "Other{value="+self.value+"}"
###########################################################
#
###########################################################
class WindShear:
"WindShear = 'WS' 3*3DIGIT '/' 5*5DIGIT 'KT'"
def __init__(self, heading="", speed="", altitude=""):
self.heading = heading
self.speed = speed
self.altitude = altitude
def toString(self):
#print("Wind shear ", (match.group(1)*100), " FT, $2 @ $3 KT")
s = "WindShear{heading=" + self.heading
s += ", speed=" + self.speed
s += ", altitude=" + self.altitude
return s + "}"
###########################################################
#
###########################################################
class TranslateTAFReportVisitor(object):
pass
###########################################################
sampleTAF = "TAF KATL 051740Z 0518/0624 24008KT P6SM SCT012 BKN035 TEMPO 0518/0520 -RA BKN012 FM052000 25006KT P6SM VCSH SCT025 BKN050 TEMPO 0520/0524 6SM -SHRA BR BKN020 FM060000 20005KT P6SM VCSH SCT015 BKN025 FM060400 16005KT 5SM -SHRA BR SCT005 OVC012 TEMPO 0605/0609 2SM SHRA BR OVC005 FM060900 19006KT 4SM -RA BR OVC007 FM061500 22008KT 6SM BR OVC015 FM061800 20010KT P6SM VCSH SCT015 BKN025"
report = TAFReport()
report.parse(sampleTAF)
print(sampleTAF, "\n=========================================")
print(report.toString())
sampleTAF = "TAF AMD KMHT 071510Z 0715/0812 02008KT 3/4SM -SNPL BR BKN007 OVC015 FM071700 05007KT 2SM -FZRAPL OVC007 FM072300 04006KT 3SM -FZRADZ OVC004 FM080500 VRB03KT 1SM BR OVC004"
report = TAFReport()
report.parse(sampleTAF)
print(sampleTAF, "\n=========================================")
print(report.toString())
ftp = ftplib.FTP('tgftp.nws.noaa.gov')
ftp.login('anonymous', '')
ftp.cwd('/data/forecasts/taf/stations');
ftp.retrlines("RETR KMHT.TXT", lambda s, w=sys.stdout: w(s+"\n"))
ftp.quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment