Skip to content

Instantly share code, notes, and snippets.

@kk7ds
Last active January 3, 2022 03:46
Show Gist options
  • Save kk7ds/bfc3c14f758274217cd7528ec4f6168c to your computer and use it in GitHub Desktop.
Save kk7ds/bfc3c14f758274217cd7528ec4f6168c to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
# Copyright 2021 Dan Smith <kk7ds+wxbeacon@danplanet.com>
#
# APRS NWS alert bulletin agent
#
# This attempts to provide one APRS bulletin message per execution for
# any NWS alerts active for the given area and zone(s). The expected
# usage is as an exec beacon for aprx, scheduled every ten minutes or
# so. Each new NWS alert will be emitted once as a bulletin until it
# is changed or updated.
import argparse
import collections
import datetime
import iso8601
import json
import logging
import pytz
import re
import requests
import sys
UTC = pytz.utc
LOG = logging.getLogger('wxbeacon')
# Default state, if none is loaded from disk.
DEFAULT_STATE = {
'timestamp': datetime.datetime.now(tz=UTC).timestamp(),
'_nostate': True,
}
# Substitutions we use to shorten the bulletin text so it fits under
# the limit. We apply these in order until the text is short enough.
COMPRESS = [
('...', ','),
('FEET', 'FT'),
('THROUGH', 'THRU'),
('EFFECT', 'EFF'),
('EVENING', 'EVE'),
('AFTERNOON', 'AFTN'),
('MORNING', 'MORN'),
('MIDNIGHT', '12AM'),
('NIGHT', 'NITE'),
('EVENING', 'EVE'),
('HAZARDOUS', 'HAZ'),
('COASTAL', 'COAST'),
('HIGH', 'HI'),
('CONDITIONS', 'CONDX'),
('WINTER', 'WNTR'),
('ADVISORY', 'ADVIS'),
('WARNING', 'WARN'),
('UNTIL', 'UNTL'),
('REMAINS', 'REM'),
('FROM', 'FRM'),
('ABOVE', 'ABV'),
]
DAYS = ['SUNDAY', 'MONDAY', 'TUESDAY', 'WEDNESDAY',
'THURSDAY', 'FRIDAY', 'SATURDAY']
class NOAAFailure(Exception):
pass
class State(collections.UserDict):
def __init__(self):
super(State, self).__init__(DEFAULT_STATE)
self.statefile = '/tmp/wxbeacon.state'
def load(self, statefile):
try:
self.statefile = statefile
with open(statefile) as f:
self.data = json.loads(f.read())
except FileNotFoundError:
LOG.warning('No statefile, assuming current')
except json.decoder.JSONDecodeError as e:
LOG.warning('Failed to parse statefile: %s' % e)
except Exception as e:
LOG.exception('Failed to read statefile: %s' % e)
raise
def save(self, statefile=None):
if not statefile:
statefile = self.statefile
LOG.debug('Writing state to %s' % statefile)
try:
with open(statefile, 'w') as f:
f.write(json.dumps(self.data))
except Exception as e:
LOG.exception('Failed to write statefile: %s' % e)
raise
def get_alerts(area, zones):
try:
resp = requests.get('https://api.weather.gov/alerts/active',
params={'area': area})
resp.raise_for_status()
except requests.RequestException as e:
raise NOAAFailure('Unable to contact NOAA: %s' % e)
data = resp.json()
now = datetime.datetime.now().astimezone(UTC)
relevant_alerts = []
for feature in data['features']:
properties = feature['properties']
if properties['messageType'] not in ('Alert', 'Update'):
LOG.debug('Ignoring alert type %r' % properties['messageType'])
continue
if properties['status'] not in ('Actual',):
LOG.debug('Ignoring alert status %r' % properties['status'])
continue
expires = iso8601.parse_date(properties['expires'])
if expires < now:
LOG.debug('Ignoring alert expired at %s' % expires)
continue
if zones & set(properties['geocode']['UGC']):
LOG.info('Found relevant alert: %s (expires %s)' % (
properties['event'], expires))
relevant_alerts.append(feature)
else:
LOG.debug('Ignoring alert for %s' % (
','.join(properties['geocode']['UGC'])))
return relevant_alerts
def make_bulletin(number, text):
LOG.debug('Original alert text: %r' % text)
group = 'WXA'
maxl = 67
# Clean up some punctuation conventions
text = text.replace('~', '').replace('|', '').replace('...', ', ')
# Compress day names always - no reason to waste the bytes
for day in DAYS:
text = text.replace(day, day[:3])
# Compress things like '4 AM' to '4AM'
text = re.sub(r'\s([0-9]+) ((A|P)M)\s', r' \1\2 ', text)
# Remove timezone references
text = re.sub(r'\s(P|C|E)(ST|DT)\s', ' ', text)
# While we're too long, apply compression substitutions in order
# until we are under the limit. They get more aggressive, so don't
# apply any we don't need.
compress = list(COMPRESS)
while compress and len(text) > maxl:
word, subst = compress.pop(0)
text = text.replace(word, subst)
LOG.debug('Shortened %r' % text)
return ':BLN%s%-5s:%s' % (number[0], group, text[:maxl])
def update_state_from_alert_ts(ts, state):
dt = iso8601.parse_date(ts)
state['timestamp'] = dt.astimezone(UTC).timestamp()
LOG.debug('Updating state timestamp to %s' % dt)
def digest_alerts(state, alerts):
state.setdefault('alerts', {})
current_ids = set([alert['properties']['id'] for alert in alerts])
stored_ids = set(state['alerts'].keys())
# Delete records for alerts we are tracking but that are no longer active
# FIXME: Should we keep this and beacon once as empty?
expired_numbers = {}
for expired in stored_ids - current_ids:
LOG.info('Expiring old alert %r with number %s' % (
state['alerts'][expired]['headline'],
state['alerts'][expired]['number']))
expired_numbers[expired] = state['alerts'][expired]['number']
del state['alerts'][expired]
state.save()
# Figure out which bulletin numbers are being used and which we can assign
active_numbers = set([alert['number']
for alert in state['alerts'].values()])
all_numbers = set(str(x) for x in range(10)) | \
set([chr(x) for x in range(ord('A'), ord('Z') + 1)])
# Do not assign numbers we are already using, or things we just expired
# as they may be re-used by updates we have yet to parse
available_numbers = [str(x) for x in (all_numbers - active_numbers -
set(expired_numbers.values()))]
available_numbers.sort()
LOG.debug('Active bulletin numbers: %s',
','.join(str(x) for x in active_numbers))
LOG.debug('Available bulletin numbers: %s',
','.join(str(x) for x in available_numbers))
for alert in alerts:
alert_id = alert['properties']['id']
if alert_id in state['alerts']:
LOG.debug('Active known alert %s has number %s' % (
alert['properties']['parameters']['NWSheadline'][0],
state['alerts'][alert_id]['number']))
continue
refs = set([ref['identifier']
for ref in alert['properties']['references']])
# Pick a persistent number for this alert, either one from a
# previously-referenced alert, or a new one from the available
# list
try:
prev_alerts = list(refs & set(expired_numbers.keys()))
if prev_alerts:
number = expired_numbers[prev_alerts[0]]
LOG.debug('Found update alert for previous bulletin %s;'
' reusing', number)
else:
number = available_numbers.pop(0)
except KeyError:
LOG.warning('No alert numbers available for %r' % (
alert['properties']['parameters']['NWSheadline'][0]))
continue
try:
# Some alerts have no NWSheadline (Hydroloic Outlook, for
# example). There is nothing for us to send on those, so
# ignore.
if 'NWSheadline' not in alert['properties']['parameters']:
LOG.info('Alert %r has no headline; skipping',
alert['properties']['event'])
continue
headline = alert['properties']['parameters']['NWSheadline'][0]
# Sometimes NWS issues multiple alerts for an area with the same
# text. If this happens, do not digest the duplicates.
if headline in set(x['headline']
for x in state['alerts'].values()):
LOG.info('Alert %r is a duplicate string; skipping',
headline)
continue
# Add new alert to our records
state['alerts'][alert_id] = {
'transmitted': 0,
'headline': headline,
'sent': alert['properties']['sent'],
'severity': alert['properties']['severity'],
'number': number,
}
except KeyError as e:
LOG.error('Missing key in alert: %s - %s' % (
e, alert['properties']))
continue
LOG.info('Tracking new alert %r sent at %s number %s' % (
alert['properties']['parameters']['NWSheadline'][0],
iso8601.parse_date(alert['properties']['sent']),
number))
state.save()
def get_alert(state, area, zones, intervals):
"""Get an alert to beacon, if appropriate.
:param state: A State object loaded from file
:param area: The NWS area string (i.e. 'OR')
:param zones: A set of NWS zone strings to limit the alerts
:param intervals: A dict of severity:interval limits to decide how often
to re-emit alerts (may be empty).
:returns: A string representing the bulletin-formatted payload to emit,
or None if no alerts are due
"""
alerts = get_alerts(area, zones)
digest_alerts(state, alerts)
now = datetime.datetime.now(tz=UTC).timestamp()
most_in_need = sorted(state['alerts'].values(),
key=lambda a: a['transmitted'])
for alert in most_in_need:
interval = intervals.get(alert.get('severity', 'Minor'))
age = now - alert['transmitted']
if alert['transmitted'] and (not interval or age < interval):
LOG.debug('%s alert %r number %s not due at %i < %s seconds',
alert['severity'], alert['headline'], alert['number'],
age, interval or 'forever')
continue
bln = make_bulletin(alert['number'], alert['headline'])
alert['transmitted'] = now
update_state_from_alert_ts(alert['sent'], state)
state.save()
LOG.info('Will beacon %r' % bln)
return bln
# Only beacon one alert at a time; be conservative over completeness
break
def print_alert(state, area, zones, intervals):
alert = get_alert(state, area, zones, intervals)
if alert:
print(alert)
else:
# aprx requires a blank line to avoid logging an error
print('')
def main():
logging.basicConfig(level=logging.ERROR)
parser = argparse.ArgumentParser()
parser.add_argument('--all', help='Assume all alerts are new',
action='store_true')
parser.add_argument('--area', help='NWS alert area',
default='OR')
parser.add_argument('--zones', help='Filter alerts for these zones',
default=['ORC005', 'ORZ006'],
nargs='*')
parser.add_argument('--statefile', help='Use alternate state file',
default='/tmp/wxbeacon.state')
parser.add_argument('--debug', help='Enable debug output',
action='store_true')
parser.add_argument('-v', '--verbose', help='Verbose logging output',
action='store_true')
parser.add_argument('--log', help='Verbose logging to this file',
default=None)
parser.add_argument('--nosave', help='Do not update state',
action='store_true')
parser.add_argument('--severe-interval', type=int, default=None,
help='Resend severe alerts every this many minutes')
parser.add_argument('--moderate-interval', type=int, default=None,
help='Resend moderate alerts every this many minutes')
args = parser.parse_args()
if args.log:
handler = logging.FileHandler(args.log)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(formatter)
LOG.addHandler(handler)
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
elif args.verbose:
logging.getLogger().setLevel(logging.INFO)
intervals = {
'Severe': args.severe_interval and 60 * args.severe_interval,
'Moderate': args.moderate_interval and 60 * args.moderate_interval,
}
state = State()
state.load(args.statefile)
if args.nosave:
state.statefile = '/dev/null'
if args.all:
for alert in state['alerts'].values():
LOG.info('Ignoring stored threshold %s for %r number %s' % (
datetime.datetime.fromtimestamp(alert['transmitted']),
alert['headline'], alert['number']))
alert['transmitted'] = 0
state['timestamp'] = 0
print_alert(state, args.area, set(args.zones), intervals)
if state.pop('_nostate', None):
LOG.info('Writing state because it was missing')
state.save()
if __name__ == '__main__':
try:
main()
except NOAAFailure as e:
LOG.error(e)
sys.exit(1)
except Exception as e:
LOG.exception('Failed to run: %s' % e)
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment