Skip to content

Instantly share code, notes, and snippets.

@zperk13
Last active August 22, 2021 23:20
Show Gist options
  • Save zperk13/1f50d161bb750e785398cc8602faee02 to your computer and use it in GitHub Desktop.
Save zperk13/1f50d161bb750e785398cc8602faee02 to your computer and use it in GitHub Desktop.
Class for accessing NWS API
import requests
import typing
class NWSWeatherAPI:
def __init__(self, user_agent: str, longitude: str, latitude: str, ignore_test_alerts: bool = True, max_alert_cache_size: int = 256):
# Read about User-Agents here: https://www.weather.gov/documentation/services-web-api
self._user_agent_header = {'User-Agent': user_agent}
self._long_lat = f'{longitude},{latitude}'
self._api_url = 'https://api.weather.gov'
self._forecast_grid_url = requests.get(f'{self._api_url}/points/{self._long_lat}', headers=self._user_agent_header).json()['properties']['forecastGridData'] # type: str
self.ignore_test_alerts = ignore_test_alerts
self._alert_cache = []
self.max_alert_cache_size = max_alert_cache_size
def _request(self, path: str, use_forcast_grid: bool = False, **kwargs) -> dict:
parameters = ''
if len(kwargs) != 0:
parameters = '?'
for key in kwargs.keys():
parameters += f'{key}={kwargs[key]}&'
# '&' is the separator between parameters. you don't need a separator after the last parameter
parameters = parameters[:-1]
if path.startswith('/'):
path = path[1:]
if path.endswith('/'):
path = path[:-1]
if use_forcast_grid:
base_url = self._forecast_grid_url
else:
base_url = self._api_url
return requests.get(f'{base_url}/{path}{parameters}', headers=self._user_agent_header).json()
def raw_forecast(self, use_metric_units: bool = False) -> dict:
if use_metric_units:
return self._request('/forecast', True, units='si')
else:
return self._request('/forecast', True)
def filtered_forecast(self, use_metric_units: bool = False) -> dict:
forecast = self.raw_forecast(use_metric_units)['properties']['periods'][0] # type: dict
result = {}
for key in ['detailedForecast', 'shortForecast', 'temperature', 'windDirection', 'windSpeed']:
result[key] = forecast[key]
return result
def raw_hourly_forecast(self, use_metric_units: bool = False) -> dict:
if use_metric_units:
return self._request('/forecast/hourly', True, units='si')
else:
return self._request('/forecast/hourly', True)
def filtered_hourly_forecast(self, use_metric_units: bool = False) -> typing.List[dict]:
forecast = self.raw_forecast(use_metric_units)['properties']['periods'] # type: typing.List[dict]
result = []
for period in forecast:
current_period_result = {}
for key in ['endTime', 'shortForecast', 'startTime', 'temperature', 'temperatureTrend', 'windDirection', 'windSpeed']:
current_period_result[key] = period[key]
result.append(current_period_result)
return result
@staticmethod
def get_alert_id(alert: dict) -> str:
if 'features' in alert.keys():
return alert['features']['id']
else:
return alert['id']
def _alert_cache_ids(self) -> typing.List[str]:
return [self.get_alert_id(a) for a in self._alert_cache]
def _trim_alert_cache(self):
if len(self._alert_cache) > self.max_alert_cache_size:
self._alert_cache = self._alert_cache[len(self._alert_cache) - self.max_alert_cache_size:]
def _active_alerts_no_cache(self):
if self.ignore_test_alerts:
return self._request('/alerts/active', False, point=self._long_lat, status='actual')
else:
return self._request('/alerts/active', False, point=self._long_lat)
def raw_active_alerts(self) -> dict:
result = self._active_alerts_no_cache()
for alert in result['features']:
if self.get_alert_id(alert) not in self._alert_cache_ids():
self._alert_cache.append(alert)
self._trim_alert_cache()
return result
@staticmethod
def _filtered_alert(alert: dict) -> dict:
result = {}
for key in ['areaDesc', 'category', 'certainty', 'description', 'event', 'headline', 'id', 'instruction', 'messageType', 'onset', 'response', 'senderName', 'severity', 'status', 'urgency']:
result[key] = alert['properties'][key]
return result
def filtered_active_alerts(self) -> typing.List[dict]:
features = self.raw_active_alerts()['features'] # type: typing.List[dict]
result = []
for f in features:
result.append(self._filtered_alert(f))
return result
def raw_new_active_alerts(self, include_updates: bool = True) -> typing.Generator[dict, None, None]:
alerts = self._active_alerts_no_cache()['features']
for alert in alerts:
if self.get_alert_id(alert) not in self._alert_cache_ids():
if alert['properties']['messageType'] == 'Alert' or include_updates:
yield alert
self._alert_cache.append(alert)
elif include_updates:
for index, cached_alert in enumerate(self._alert_cache):
if self.get_alert_id(cached_alert) == self.get_alert_id(alert):
if cached_alert != alert:
self._alert_cache[index] = alert
yield alert
break
self._trim_alert_cache()
def filtered_new_active_alerts(self, include_updates: bool = True) -> typing.Generator[dict, None, None]:
for alert in self.raw_new_active_alerts(include_updates):
yield self._filtered_alert(alert)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment