Created
June 24, 2020 23:14
-
-
Save Nullpo1nt/70109f56fa47fd7e0fb924ac1bbbd508 to your computer and use it in GitHub Desktop.
Parses NOAA TAF reports into basic structure.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import ftplib | |
import sys | |
cloudType = {"CB": "Cumulonimbus"} | |
skyCondition = {"SKC": "Sky clear", "FEW": "Few", "SCT": "Scattered", "BKN": "Broken", "OVC": "Overcast"} | |
intensity = {"-": "Light", " ": "Moderate", "+": "Heavy", "VC": "Vicinity"} | |
qualifier = {"MI": "Shallow", "BC": "Patches", "DR": "Low Drifting", "BL": "Blowing", "SH": "Showers", "TS": "Thunderstorm", "FZ": "Freezing", "PR": "Partial"} | |
precipitation = {"DZ": "Drizzle", "RA": "Rain", "SN": "Snow", "SG": "Snow Grains", "IC": "Ice Crystals", "PL": "Ice Pellets", "GR": "Hail", | |
"GS": "Small Hail or Snow Pellets", # (less than 1/4 inch in diameter)", | |
"UP": "Unknown precipitation"} | |
obscuration = {"BR": "Mist", # (Foggy conditions with visibilities greater than 5/8 statute mile)", | |
"FG": "Fog", # (visibility 5/8 statute mile or less)", | |
"FU": "Smoke", "DU": "Dust", "SA": "Sand", "HZ": "Haze", "PY": "Spray", "VA": "Volcanic Ash"} | |
other = {"PO": "Well-Developed Dust/Sand Whirls", "SQ": "Squalls", "FC": "Funnel Cloud", "SS": "Sandstorm", "DS": "Duststorm"} | |
########################################################### | |
# | |
########################################################### | |
def getDateTime(s): | |
#if s =~ /(\d{2})(\d{2})(\d{2})/): | |
# s = "$2:00Z to $3:00Z" | |
#elif s =~ /(\d{2})(\d{2})\/(\d{2})(\d{2})/: | |
# s = "$2:00Z to $4:00Z" | |
#elif s =~ /(\d{2})(\d{2})/: | |
# s = "$1:00Z to $2:00Z" | |
return s | |
########################################################### | |
# | |
########################################################### | |
class Base(object): | |
def toString(self): | |
return "?Base{}?" | |
def parse(self, s): | |
pass | |
def filterSplit(self, list): | |
"Remove empty (or white chars only) elements from a list" | |
result = [] | |
for item in list: | |
if not(re.match('^\s*$', item)): | |
result.append(item) | |
return result | |
def normalize(self, s): | |
# normalize | |
s = re.sub('=.*', '', s) | |
s = re.sub('^\s+|\s+$', '', s) | |
s = re.sub('\s+', ' ', s) | |
s = s.upper() | |
# Remove anything before the first "TAF" and the first | |
# "=" or end of string | |
#s = re.sub('^.*(TAF.*?=?)$', '\1', s) | |
return s | |
########################################################### | |
# | |
########################################################### | |
class Values(Base): | |
def __init__(self, values=None): | |
if values == None: | |
values = [] | |
self.values = values | |
def addValue(self, value): | |
self.values.append(value) | |
def getValues(self): | |
return self.values | |
def getValue(self, pos): | |
if pos < len(self.values) and pos >= 0: | |
return self.values[pos] | |
return None | |
def toString(self): | |
s = [] | |
for i in self.values: | |
s.append("value=") | |
s.append("i") | |
s.append(", ") | |
return ''.join(s) | |
########################################################### | |
# | |
########################################################### | |
class TAFReport(Base): | |
"TAFReport = 'TAF' TAF SP 1*RAF SP '='" | |
def __init__(self, taf=None, rafs=None): | |
if taf == None: | |
taf = TAF() | |
self.taf = taf | |
if rafs == None: | |
rafs = [] | |
self.rafs = rafs | |
def parse(self, s): | |
"Parses a TAF string and returns a TAF object." | |
taf = self.normalize(s) | |
# Split into blocks based on group identifiers | |
reports = re.split('\s*(TEMPO|FM\d+|PROB\d+|BECMG)', taf) | |
r = [] | |
for item in reports: | |
if re.search('^\s+', item): | |
item = r.pop() + item | |
r.append(item) | |
for item in r: | |
if re.match('^TAF\s+', item): | |
self.taf.parse(item) | |
elif re.match('^TEMPO\s+', item): | |
temporary = Temporary() | |
temporary.parse(item) | |
self.rafs.append(temporary) | |
elif re.match('^FM\d+\s+', item): | |
fm = From() | |
fm.parse(item) | |
self.rafs.append(fm) | |
elif re.match('^PROB\d+\s+', item): | |
prob = Probability() | |
prob.parse(item) | |
self.rafs.append(prob) | |
elif re.match('^BECMG\s+', item): | |
becoming = Becoming() | |
becoming.parse(item) | |
self.rafs.append(becoming) | |
else: | |
print("\tUnknown format: ", item) | |
def toString(self): | |
s = "TAFReport{\ntaf=" + self.taf.toString() | |
for item in self.rafs: | |
s += "\nrafs[]=" + item.toString() | |
return s + "\n}"; | |
########################################################### | |
# | |
########################################################### | |
class TAF(Base): | |
"TAF = [SP 'AMD'] SP ICAO SP IssuedTime SP ValidRange SP Forecast" | |
def __init__(self, icao="", ammended="", issueTime="", validTimeFrom="", validTimeTo="", forecast=None): | |
self.icao = icao | |
self.ammended = ammended | |
self.issueTime = issueTime | |
self.validTimeFrom = validTimeFrom | |
self.validTimeTo = validTimeTo | |
if (forecast == None): | |
forecast = Forecast() | |
self.forecast = forecast | |
def parse(self, s): | |
r = re.split('(TAF\s+.*?\s+\d+(?:\/\d+)?)\s+', s) | |
r = self.filterSplit(r); | |
if len(r) == 2: | |
taf = r[0]; | |
match = re.search('TAF\s+(?:(AMD)\s+)?(\w{4})\s+(\d{2})(\d{2})(\d{2})Z\s+(\d{2})(\d{2})(\d{2})', taf) | |
if match: | |
self.icao = match.group(2) | |
if match.group(1): | |
self.ammended = "AMD" | |
self.issueTime = match.group(3)+" "+match.group(4)+":"+match.group(5) | |
self.validTimeFrom = match.group(6)+" "+match.group(7)+":00" | |
self.validTimeTo = (match.group(6)+1)+" "+match.group(8)+":00" | |
match = re.search('TAF\s+(?:(AMD)\s+)?(\w{4})\s+(\d{2})(\d{2})(\d{2})Z\s+(\d{2})(\d{2})\/(\d{2})(\d{2})', taf) | |
if match: | |
self.icao = match.group(2) | |
if match.group(1): | |
self.ammended = "AMD" | |
self.issueTime = match.group(3)+" "+match.group(4)+":"+match.group(5) | |
self.validTimeFrom = match.group(6)+" "+match.group(7)+":00" | |
self.validTimeTo = match.group(8)+" "+match.group(9)+":00" | |
self.forecast.parse(r[1]) | |
def toString(self): | |
s = "TAF{\nicao="+self.icao | |
s += ", \nammended="+self.ammended | |
s += ", \nissueTime="+self.issueTime | |
s += ", \nvalidTimeFrom="+self.validTimeFrom | |
s += ", \nvalidTimeTo="+self.validTimeTo | |
s += ", \nforecast="+self.forecast.toString() | |
return s + "\n}" | |
########################################################### | |
# | |
########################################################### | |
class RAF(Base): | |
"RAF = (Probability | From | Temporary | Becoming) SP Forecast" | |
def __init__(self, fromTime="", toTime="", forecast=None): | |
self.fromTime = fromTime | |
self.toTime = toTime | |
if forecast == None: | |
forecast = Forecast() | |
self.forecast = forecast | |
def parse(self, s): | |
pass | |
def toString(self): | |
return "RAF{}" | |
########################################################### | |
# | |
########################################################### | |
class Probability(RAF): | |
"Probability = 'PROB' 2*2 DIGIT SP 4*4DIGIT" | |
def __init__(self, probability=-1): | |
self.probability = probability | |
def parse(self, s): | |
r = re.split('(PROB\d{2}\s+\d+(?:\/\d+)?)\s+', s) | |
r = self.filterSplit(r) | |
if len(r) == 2: | |
match = re.search('PROB(\d+)\s+(\d+(?:\/\d+)?)', r[0]) | |
if match: | |
self.probability = match.group(1) | |
self.fromTime = match.group(2) | |
self.forecast = Forecast.parse(r[1]) | |
def toString(self): | |
#print("\t", getDateTime(match.group(2)), ", $1% chance\n") | |
s = "Probability{\nprobability="+self.probability | |
s += ", \nfromTime="+self.fromTime | |
s += ", \nforecast="+self.forecast.toString() | |
return s + "\n}" | |
########################################################### | |
# | |
########################################################### | |
class From(RAF): | |
"From = 'FM' 4*4DIGIT" | |
def parse(self, s): | |
r = re.split('(FM\d+\s+)', s) | |
r = self.filterSplit(r) | |
if len(r) == 2: | |
match = re.search('FM(\d+)', r[0]) | |
if match: | |
self.fromTime = match.group(1) | |
self.forecast.parse(r[1]) | |
def toString(self): | |
#print("\t", getDateTime(mathc.group(1)), ", rapidly becoming\n") | |
s = "From{\nfromTime="+self.fromTime | |
s += ", \nforecast="+self.forecast.toString() | |
return s + "\n}" | |
########################################################### | |
# | |
########################################################### | |
class Temporary(RAF): | |
"Temporary = 'TEMPO' SP 4*4DIGIT" | |
def parse(self, s): | |
r = re.split('(TEMPO\s+\d+(?:\/\d+)?)\s+', s) | |
r = self.filterSplit(r) | |
if len(r) == 2: | |
match = re.match('TEMPO\s+(\d+(?:\/\d+)?)', r[0]) | |
if match: | |
self.fromTime = match.group(1) | |
self.forecast.parse(r[1]) | |
def toString(self): | |
#print("\t", getDateTime(match.group(1)), ", occasional\n") | |
s = "Temporary{\nfromTime="+self.fromTime | |
s += ", \nforecast="+self.forecast.toString() | |
return s + "\n}" | |
########################################################### | |
# | |
########################################################### | |
class Becoming(RAF): | |
"Becoming = 'BECMG' SP 4*4DIGIT" | |
def parse(self, s): | |
r = re.split('(BECMG\s+\d+(?:\/\d+)?)\s+', s) | |
r = self.filterSplit(r) | |
if len(r) == 2: | |
match = re.match('BECMG\s+(\d+(?:\/\d+)?)', r[0]) | |
if match: | |
self.fromTime = match.group(1) | |
self.forecast = Forecast.parse(r[1]) | |
def toString(self): | |
#print("\t", getDateTime(match.group(1)), ", becoming\n") | |
s = "Becoming{\nfromTime="+self.fromTime | |
s += ", \nforecast="+self.forecast.toString() | |
return s + "\n}" | |
########################################################### | |
# | |
########################################################### | |
class Forecast(Base): | |
"Forecast = Wind [SP Visibility] 0*(SP Weather) 0*(SP Condition) [SP WindShear]" | |
def __init__(self, wind=None, visibility=None, weather=None, condition=None, windshear=None): | |
if wind == None: | |
wind = Wind() | |
self.wind = wind | |
if visibility == None: | |
visibility = Visibility() | |
self.visibility = visibility | |
if weather == None: | |
weather = [] | |
self.weather = weather | |
if condition == None: | |
condition = [] | |
self.condition = condition | |
if windshear == None: | |
windshear = WindShear() | |
self.windshear = windshear | |
def parse(self, s): | |
forecast = re.split('\s*((?:P6|[0-6]|(?:\d\s+)?\d\/\d)SM|(?:(?:FEW|SCT|BKN|OVC)\d{3}(?:CB)?|SKC)|WS\d+\/\d+KT|(?:-|\+|VC)?(?:MI|BC|DR|BL|SH|TS|FZ|PR|DZ|RA|SN|SG|IC|PL|GR|GS|UP|BR|FG|FU|DU|SA|HZ|PY|VA|PO|SQ|FC|SS|DS|NSW)+)\s*', s) | |
forecast = self.filterSplit(forecast) | |
for item in forecast: | |
match = re.match('^(\d{3}|VRB)(\d+)(?:G(\d+))?KT', item) | |
if match: | |
self.wind = Wind(match.group(1), match.group(2)) | |
if match.group(3): | |
self.wind.gusting = match.group(3) | |
continue | |
match = re.match('^(P6|(?:[0-6]\s+)?\d\/\d|[0-6])SM', item) | |
if match: | |
self.visibility = Visibility(match.group(1)) | |
continue | |
match = re.match('^(SKC)|(FEW|SCT|BKN|OVC)(\d{3})(CB)?', item) | |
if match: | |
if match.group(1): | |
condition = Condition(match.group(1)) | |
self.condition.append(condition) | |
else: | |
condition = Condition(match.group(2), match.group(3)) | |
if match.group(4): | |
condition.cb = match.group(4) | |
self.condition.append(condition) | |
continue | |
match = re.match('^(-|\+|VC)?(MI|BC|DR|BL|SH|TS|FZ|PR)?((?:DZ|RA|SN|SG|IC|PL|GR|GS|UP)*)(BR|FG|FU|DU|SA|HZ|PY|VA)?(PO|SQ|FC|SS|DS)?(NSW)?$', item) | |
if match: | |
if match.group(6): | |
self.weather.append(NSW()) | |
elif match.group(1) or match.group(2) or match.group(3) or match.group(4) or match.group(5): | |
w = Weather() | |
self.weather.append(w) | |
if match.group(1): | |
w.intensity.value = match.group(1) | |
if match.group(2): | |
w.descriptor.value = match.group(2) | |
if match.group(3): | |
r = re.split('(\w{2})', match.group(3)) | |
print("precip len: ", len(r)) | |
for c in r: | |
if len(c) > 0: | |
print(c) | |
w.percipitation.addValue(c) | |
if match.group(4): | |
w.obscuration.value = match.group(4) | |
if match.group(5): | |
w.other.value = match.group(5) | |
continue | |
match = re.match('^WS(\d+)\/(\d{3})(\d+)KT', item) | |
if match: | |
self.windshear = WindShear(match.group(2), match.group(3), match.group(1)) | |
continue | |
print(">>>>>>>>> Forecast error, unkown format: \"$_\" <<<<<<<<<<<") | |
def toString(self): | |
s = "Forecast{\nwind="+self.wind.toString() | |
s += ", \nvisibility="+self.visibility.toString() | |
for i in self.weather: | |
s += ", \nweather[]=" + i.toString() | |
for i in self.condition: | |
s += ", \ncondition[]=" + i.toString() | |
s += ", \nwindshear="+self.windshear.toString() | |
return s + "\n}" | |
########################################################### | |
# | |
########################################################### | |
class Wind: | |
"Wind = Direction Speed ['G' Speed] 'KT'" | |
def __init__(self, heading="", speed="", gusting=""): | |
self.heading = heading | |
self.speed = speed | |
self.gusting = gusting | |
def toString(self): | |
s = "Wind{heading=" + self.heading | |
s += ", speed=" + self.speed | |
s += ", gusting=" + self.gusting | |
return s + "}" | |
########################################################### | |
# | |
########################################################### | |
class Visibility: | |
"Visibility = (DIGIT SP DIGIT '/' DIGIT | DIGIT '/' DIGIT | DIGIT | 'P6') 'SM'" | |
def __init__(self, distance=""): | |
self.distance = distance | |
def toString(self): | |
#if match.group(1): | |
# print("greater than 6") | |
#else: | |
# print("$2") | |
#print(" SM") | |
return "Visibility{distance="+self.distance+"}" | |
########################################################### | |
# | |
########################################################### | |
class Condition: | |
"Condition = ('FEW' | 'SCT' | 'BKN' | 'OVC') Altitude *1('CB') | 'SKC'" | |
def __init__(self, condition="", altitude="", cb=""): | |
self.condition = condition | |
self.altitude = altitude | |
self.cb = cb | |
def toString(self): | |
# if (match.group(1)): | |
# print(skyCondition[match.group(1)]) | |
# else: | |
# print(skyCondition[match.group(2)], " ") | |
# if match.group(4): | |
# print(cloudType[match.group(4)], " ") | |
# print((match.group(3)*100), " FT") | |
s = "Condition{condition=" + self.condition | |
s += ", altitude=" + self.altitude | |
s += ", cb=" + self.cb | |
return s + "}" | |
class WeatherType(Base): | |
pass | |
########################################################### | |
# | |
########################################################### | |
class Weather(WeatherType): | |
"Weather = *1Intensity *1Descriptor *1Percipitation *1Obscuration *1Other" | |
def __init__(self, intensity=None, descriptor=None, percipitation=None, obscuration=None, other=None): | |
if intensity == None: | |
intensity = Intensity() | |
self.intensity = intensity | |
if descriptor == None: | |
descriptor = Descriptor() | |
self.descriptor = descriptor | |
if percipitation == None: | |
percipitation = Percipitation() | |
self.percipitation = percipitation | |
if obscuration == None: | |
obscuration = Obscuration() | |
self.obscuration = obscuration | |
if other == None: | |
other = Other() | |
self.other = other | |
def toString(self): | |
s = "Weather{\n\tintensity="+self.intensity.toString() | |
s += ", \n\tdescriptor=" + self.descriptor.toString() | |
s += ", \n\tpercipitation=" + self.percipitation.toString() | |
s += ", \n\tobscuration=" + self.obscuration.toString() | |
s += ", \n\tother=" + self.other.toString() | |
return s + "\n}" | |
########################################################### | |
# | |
########################################################### | |
class NSW(WeatherType): | |
def toString(self): | |
return "NSW{}" | |
########################################################### | |
# | |
########################################################### | |
class Intensity: | |
"Intensity = '-' | '+' | 'VC'" | |
def __init__(self, value=""): | |
self.value = value | |
def toString(self): | |
return "Intensity{value="+self.value+"}" | |
########################################################### | |
# | |
########################################################### | |
class Descriptor: | |
"Descriptor = 'MI' | 'BC' | 'DR' | 'BL' | 'SH' | 'TS' | 'FZ' | 'PR'" | |
def __init__(self, value=""): | |
self.value = value | |
def toString(self): | |
return "Descriptor{value="+self.value+"}" | |
########################################################### | |
# | |
########################################################### | |
class Percipitation(Values): | |
"Percipitation = 'DZ' | 'RA' | 'SN' | 'SG' | 'IC' | 'PL' | 'GR' | 'GS' | 'UP'" | |
def toString(self): | |
return ''.join(["Percipitation{", Values.toString(self), "}"]) | |
########################################################### | |
# | |
########################################################### | |
class Obscuration: | |
"Obscuration = 'BR' | 'FG' | 'FU' | 'DU' | 'SA' | 'HZ' | 'PY' | 'VA'" | |
def __init__(self, value=""): | |
self.value = value | |
def toString(self): | |
return "Obscuration{value="+self.value+"}" | |
########################################################### | |
# | |
########################################################### | |
class Other: | |
"Other = 'PO' | 'SQ' | 'FC' | 'SS' | 'DS'" | |
def __init__(self, value=""): | |
self.value = value | |
def toString(self): | |
return "Other{value="+self.value+"}" | |
########################################################### | |
# | |
########################################################### | |
class WindShear: | |
"WindShear = 'WS' 3*3DIGIT '/' 5*5DIGIT 'KT'" | |
def __init__(self, heading="", speed="", altitude=""): | |
self.heading = heading | |
self.speed = speed | |
self.altitude = altitude | |
def toString(self): | |
#print("Wind shear ", (match.group(1)*100), " FT, $2 @ $3 KT") | |
s = "WindShear{heading=" + self.heading | |
s += ", speed=" + self.speed | |
s += ", altitude=" + self.altitude | |
return s + "}" | |
########################################################### | |
# | |
########################################################### | |
class TranslateTAFReportVisitor(object): | |
pass | |
########################################################### | |
sampleTAF = "TAF KATL 051740Z 0518/0624 24008KT P6SM SCT012 BKN035 TEMPO 0518/0520 -RA BKN012 FM052000 25006KT P6SM VCSH SCT025 BKN050 TEMPO 0520/0524 6SM -SHRA BR BKN020 FM060000 20005KT P6SM VCSH SCT015 BKN025 FM060400 16005KT 5SM -SHRA BR SCT005 OVC012 TEMPO 0605/0609 2SM SHRA BR OVC005 FM060900 19006KT 4SM -RA BR OVC007 FM061500 22008KT 6SM BR OVC015 FM061800 20010KT P6SM VCSH SCT015 BKN025" | |
report = TAFReport() | |
report.parse(sampleTAF) | |
print(sampleTAF, "\n=========================================") | |
print(report.toString()) | |
sampleTAF = "TAF AMD KMHT 071510Z 0715/0812 02008KT 3/4SM -SNPL BR BKN007 OVC015 FM071700 05007KT 2SM -FZRAPL OVC007 FM072300 04006KT 3SM -FZRADZ OVC004 FM080500 VRB03KT 1SM BR OVC004" | |
report = TAFReport() | |
report.parse(sampleTAF) | |
print(sampleTAF, "\n=========================================") | |
print(report.toString()) | |
ftp = ftplib.FTP('tgftp.nws.noaa.gov') | |
ftp.login('anonymous', '') | |
ftp.cwd('/data/forecasts/taf/stations'); | |
ftp.retrlines("RETR KMHT.TXT", lambda s, w=sys.stdout: w(s+"\n")) | |
ftp.quit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment