Last active
August 22, 2021 23:20
-
-
Save zperk13/1f50d161bb750e785398cc8602faee02 to your computer and use it in GitHub Desktop.
Class for accessing NWS API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import typing | |
class NWSWeatherAPI: | |
def __init__(self, user_agent: str, longitude: str, latitude: str, ignore_test_alerts: bool = True, max_alert_cache_size: int = 256): | |
# Read about User-Agents here: https://www.weather.gov/documentation/services-web-api | |
self._user_agent_header = {'User-Agent': user_agent} | |
self._long_lat = f'{longitude},{latitude}' | |
self._api_url = 'https://api.weather.gov' | |
self._forecast_grid_url = requests.get(f'{self._api_url}/points/{self._long_lat}', headers=self._user_agent_header).json()['properties']['forecastGridData'] # type: str | |
self.ignore_test_alerts = ignore_test_alerts | |
self._alert_cache = [] | |
self.max_alert_cache_size = max_alert_cache_size | |
def _request(self, path: str, use_forcast_grid: bool = False, **kwargs) -> dict: | |
parameters = '' | |
if len(kwargs) != 0: | |
parameters = '?' | |
for key in kwargs.keys(): | |
parameters += f'{key}={kwargs[key]}&' | |
# '&' is the separator between parameters. you don't need a separator after the last parameter | |
parameters = parameters[:-1] | |
if path.startswith('/'): | |
path = path[1:] | |
if path.endswith('/'): | |
path = path[:-1] | |
if use_forcast_grid: | |
base_url = self._forecast_grid_url | |
else: | |
base_url = self._api_url | |
return requests.get(f'{base_url}/{path}{parameters}', headers=self._user_agent_header).json() | |
def raw_forecast(self, use_metric_units: bool = False) -> dict: | |
if use_metric_units: | |
return self._request('/forecast', True, units='si') | |
else: | |
return self._request('/forecast', True) | |
def filtered_forecast(self, use_metric_units: bool = False) -> dict: | |
forecast = self.raw_forecast(use_metric_units)['properties']['periods'][0] # type: dict | |
result = {} | |
for key in ['detailedForecast', 'shortForecast', 'temperature', 'windDirection', 'windSpeed']: | |
result[key] = forecast[key] | |
return result | |
def raw_hourly_forecast(self, use_metric_units: bool = False) -> dict: | |
if use_metric_units: | |
return self._request('/forecast/hourly', True, units='si') | |
else: | |
return self._request('/forecast/hourly', True) | |
def filtered_hourly_forecast(self, use_metric_units: bool = False) -> typing.List[dict]: | |
forecast = self.raw_forecast(use_metric_units)['properties']['periods'] # type: typing.List[dict] | |
result = [] | |
for period in forecast: | |
current_period_result = {} | |
for key in ['endTime', 'shortForecast', 'startTime', 'temperature', 'temperatureTrend', 'windDirection', 'windSpeed']: | |
current_period_result[key] = period[key] | |
result.append(current_period_result) | |
return result | |
@staticmethod | |
def get_alert_id(alert: dict) -> str: | |
if 'features' in alert.keys(): | |
return alert['features']['id'] | |
else: | |
return alert['id'] | |
def _alert_cache_ids(self) -> typing.List[str]: | |
return [self.get_alert_id(a) for a in self._alert_cache] | |
def _trim_alert_cache(self): | |
if len(self._alert_cache) > self.max_alert_cache_size: | |
self._alert_cache = self._alert_cache[len(self._alert_cache) - self.max_alert_cache_size:] | |
def _active_alerts_no_cache(self): | |
if self.ignore_test_alerts: | |
return self._request('/alerts/active', False, point=self._long_lat, status='actual') | |
else: | |
return self._request('/alerts/active', False, point=self._long_lat) | |
def raw_active_alerts(self) -> dict: | |
result = self._active_alerts_no_cache() | |
for alert in result['features']: | |
if self.get_alert_id(alert) not in self._alert_cache_ids(): | |
self._alert_cache.append(alert) | |
self._trim_alert_cache() | |
return result | |
@staticmethod | |
def _filtered_alert(alert: dict) -> dict: | |
result = {} | |
for key in ['areaDesc', 'category', 'certainty', 'description', 'event', 'headline', 'id', 'instruction', 'messageType', 'onset', 'response', 'senderName', 'severity', 'status', 'urgency']: | |
result[key] = alert['properties'][key] | |
return result | |
def filtered_active_alerts(self) -> typing.List[dict]: | |
features = self.raw_active_alerts()['features'] # type: typing.List[dict] | |
result = [] | |
for f in features: | |
result.append(self._filtered_alert(f)) | |
return result | |
def raw_new_active_alerts(self, include_updates: bool = True) -> typing.Generator[dict, None, None]: | |
alerts = self._active_alerts_no_cache()['features'] | |
for alert in alerts: | |
if self.get_alert_id(alert) not in self._alert_cache_ids(): | |
if alert['properties']['messageType'] == 'Alert' or include_updates: | |
yield alert | |
self._alert_cache.append(alert) | |
elif include_updates: | |
for index, cached_alert in enumerate(self._alert_cache): | |
if self.get_alert_id(cached_alert) == self.get_alert_id(alert): | |
if cached_alert != alert: | |
self._alert_cache[index] = alert | |
yield alert | |
break | |
self._trim_alert_cache() | |
def filtered_new_active_alerts(self, include_updates: bool = True) -> typing.Generator[dict, None, None]: | |
for alert in self.raw_new_active_alerts(include_updates): | |
yield self._filtered_alert(alert) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment