Last active
May 16, 2023 12:05
-
-
Save joe248/1d8e42dd6904d480770f67f2bb640a64 to your computer and use it in GitHub Desktop.
Home Assistant PJM Load Data Sensor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""The PJM component.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"domain": "pjm", | |
"name": "PJM Sensor", | |
"documentation": "", | |
"requirements": [], | |
"dependencies": [], | |
"codeowners": [], | |
"version": "1.0" | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Support for PJM data. | |
""" | |
import asyncio | |
from datetime import datetime as dt, date, time, timezone, timedelta | |
import logging | |
import aiohttp | |
import async_timeout | |
import voluptuous as vol | |
import urllib.parse | |
from homeassistant.components.sensor import PLATFORM_SCHEMA | |
from homeassistant.const import ( | |
CONF_NAME, CONF_TYPE, CONF_ZONE, CONF_MONITORED_VARIABLES) | |
from homeassistant.helpers.aiohttp_client import async_get_clientsession | |
import homeassistant.helpers.config_validation as cv | |
from homeassistant.helpers.entity import Entity | |
from homeassistant.util import Throttle | |
_LOGGER = logging.getLogger(__name__) | |
RESOURCE_INSTANTANEOUS = 'https://api.pjm.com/api/v1/inst_load' | |
RESOURCE_FORECAST = 'https://api.pjm.com/api/v1/load_frcstd_7_day' | |
RESOURCE_SUBSCRIPTION_KEY = 'https://dataminer2.pjm.com/config/settings.json' | |
# Don't set these too low or you'll get IP banned | |
MIN_TIME_BETWEEN_UPDATES_INSTANTANEOUS = timedelta(seconds=300) | |
MIN_TIME_BETWEEN_UPDATES_FORECAST = timedelta(seconds=1800) | |
PJM_RTO_ZONE = "PJM RTO" | |
FORECAST_COMBINED_ZONE = 'RTO_COMBINED' | |
ICON_POWER = 'mdi:flash' | |
CONF_INSTANTANEOUS_ZONE_LOAD = 'instantaneous_zone_load' | |
CONF_INSTANTANEOUS_TOTAL_LOAD = 'instantaneous_total_load' | |
CONF_ZONE_LOAD_FORECAST = 'zone_load_forecast' | |
CONF_TOTAL_LOAD_FORECAST = 'total_load_forecast' | |
SENSOR_TYPES = { | |
CONF_INSTANTANEOUS_ZONE_LOAD: ["PJM Instantaneous Zone Load", 'MW'], | |
CONF_INSTANTANEOUS_TOTAL_LOAD: ["PJM Instantaneous Total Load", 'MW'], | |
CONF_ZONE_LOAD_FORECAST: ["PJM Zone Load Forecast", 'MW'], | |
CONF_TOTAL_LOAD_FORECAST: ["PJM Total Load Forecast", 'MW'], | |
} | |
SENSORS_SCHEMA = vol.Schema({ | |
vol.Required(CONF_TYPE): vol.In(SENSOR_TYPES), | |
vol.Optional(CONF_ZONE): cv.string, | |
vol.Optional(CONF_NAME): cv.string, | |
}) | |
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ | |
vol.Required(CONF_MONITORED_VARIABLES): [SENSORS_SCHEMA], | |
}) | |
async def async_setup_platform(hass, config, async_add_devices, | |
discovery_info=None): | |
"""Set up the PJM sensor.""" | |
# Create one PJMData object shared between all sensors | |
pjm_data = PJMData(async_get_clientsession(hass)) | |
dev = [] | |
for variable in config[CONF_MONITORED_VARIABLES]: | |
sensor_type = variable[CONF_TYPE] | |
zone = variable.get(CONF_ZONE) | |
if sensor_type == CONF_INSTANTANEOUS_TOTAL_LOAD: | |
zone = PJM_RTO_ZONE | |
if sensor_type == CONF_TOTAL_LOAD_FORECAST: | |
zone = FORECAST_COMBINED_ZONE | |
dev.append(PJMSensor( | |
pjm_data, sensor_type, | |
zone, variable.get(CONF_NAME))) | |
async_add_devices(dev, True) | |
class PJMSensor(Entity): | |
"""Implementation of a PJM sensor.""" | |
def __init__(self, pjm_data, sensor_type, zone, name): | |
"""Initialize the sensor.""" | |
self._pjm_data = pjm_data | |
if name: | |
self._name = name | |
else: | |
self._name = SENSOR_TYPES[sensor_type][0] | |
if sensor_type == CONF_INSTANTANEOUS_ZONE_LOAD or sensor_type == CONF_ZONE_LOAD_FORECAST: | |
self._name += ' ' + zone | |
self._type = sensor_type | |
self._state = None | |
self._forecast_hour_ending = None | |
self._zone = zone | |
self._unit_of_measurement = SENSOR_TYPES[sensor_type][1] | |
@property | |
def name(self): | |
"""Return the name of the sensor.""" | |
return self._name | |
@property | |
def icon(self): | |
"""Icon to use in the frontend, if any.""" | |
return ICON_POWER | |
@property | |
def state(self): | |
"""Return the state of the sensor.""" | |
return self._state | |
@property | |
def unit_of_measurement(self): | |
"""Return the unit of measurement of this entity, if any.""" | |
return self._unit_of_measurement | |
@property | |
def extra_state_attributes(self): | |
attr = {} | |
if self._zone and self._type != CONF_TOTAL_LOAD_FORECAST: | |
attr["zone"] = self._zone | |
if self._type == CONF_TOTAL_LOAD_FORECAST or self._type == CONF_ZONE_LOAD_FORECAST: | |
attr["forecast_hour_ending"] = self._forecast_hour_ending | |
return attr | |
async def async_update(self): | |
"""Use the PJM data to set our state.""" | |
try: | |
if self._type == CONF_INSTANTANEOUS_ZONE_LOAD or self._type == CONF_INSTANTANEOUS_TOTAL_LOAD: | |
await self.update_load() | |
else: | |
await self.update_forecast() | |
except (ValueError, KeyError): | |
_LOGGER.error("Could not update status for %s", self._name) | |
except AttributeError as err: | |
_LOGGER.error("Could not update status for PJM: %s", err) | |
except TypeError: | |
# I believe this can happen if one of the function calls is throttled. Just ignore. | |
pass | |
except Exception as err: | |
_LOGGER.error("Unknown error for PJM: %s", err) | |
@Throttle(MIN_TIME_BETWEEN_UPDATES_INSTANTANEOUS) | |
async def update_load(self): | |
load = await self._pjm_data.async_update_instantaneous(self._zone) | |
if load is not None: | |
self._state = load | |
@Throttle(MIN_TIME_BETWEEN_UPDATES_FORECAST) | |
async def update_forecast(self): | |
load, forecast_hour_ending = await self._pjm_data.async_update_forecast(self._zone) | |
if load is not None: | |
self._state = load | |
if forecast_hour_ending is not None: | |
self._forecast_hour_ending = forecast_hour_ending | |
class PJMData(object): | |
"""Get and parse data from PJM.""" | |
def __init__(self, websession): | |
"""Initialize the data object.""" | |
self._websession = websession | |
self._subscription_key = None | |
def _get_headers(self): | |
return { | |
'Ocp-Apim-Subscription-Key': self._subscription_key, | |
'Origin': 'https://dataminer2.pjm.com', | |
'Referer': 'https://dataminer2.pjm.com/', | |
'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Opera";v="86"', | |
'sec-ch-ua-mobile': '?0', | |
'sec-ch-ua-platform': '"macOS"', | |
'Sec-Fetch-Dest': 'empty', | |
'Sec-Fetch-Mode': 'cors', | |
'Sec-Fetch-Site': 'same-site', | |
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36 OPR/86.0.4363.59', | |
} | |
async def _get_subscription_key(self): | |
_LOGGER.info("Attempting to get subscription key") | |
try: | |
with async_timeout.timeout(60): | |
response = await self._websession.get(RESOURCE_SUBSCRIPTION_KEY) | |
data = await response.json() | |
self._subscription_key = data['subscriptionKey'] | |
if self._subscription_key: | |
_LOGGER.info("Got subscription key") | |
except Exception as err: | |
_LOGGER.error("Could not get PJM subscription key: %s", err) | |
async def async_update_instantaneous(self, zone): | |
# If we don't have the subscription key we need to get it first | |
if not self._subscription_key: | |
await self._get_subscription_key() | |
# Search for data from the past 10 minutes | |
end_time_utc = dt.now().astimezone(timezone.utc) | |
start_time_utc = end_time_utc - timedelta(minutes=10) | |
time_string = start_time_utc.strftime('%m/%e/%Y %H:%Mto') + end_time_utc.strftime('%m/%e/%Y %H:%M') | |
params = { | |
'rowCount': '100', | |
'sort': 'datetime_beginning_utc', | |
'order': 'Desc', | |
'startRow': '1', | |
'isActiveMetadata': 'true', | |
'fields': 'area,instantaneous_load', | |
'datetime_beginning_utc': time_string, | |
} | |
resource = "{}?{}".format(RESOURCE_INSTANTANEOUS, urllib.parse.urlencode(params)) | |
headers = self._get_headers() | |
try: | |
with async_timeout.timeout(60): | |
response = await self._websession.get(resource, headers=headers) | |
data = await response.json() | |
if not data: | |
_LOGGER.error("No load data returned for zone %s", zone) | |
# No data; return | |
return None | |
items = data["items"] | |
# The data should already be sorted by date, so find the first instance of the zone we're looking for | |
for item in items: | |
if item["area"] == zone: | |
return int(round(item["instantaneous_load"])) | |
_LOGGER.error("Couldn't find load data for zone %s", zone) | |
return None | |
except (asyncio.TimeoutError, aiohttp.ClientError) as err: | |
_LOGGER.error("Could not get load data from PJM: %s", err) | |
# Try to get a new subscription key | |
await self._get_subscription_key() | |
return None | |
except Exception as err: | |
_LOGGER.error("Could not get load data from PJM: %s", err) | |
return None | |
async def async_update_forecast(self, zone): | |
# If we don't have the subscription key we need to get it first | |
if not self._subscription_key: | |
await self._get_subscription_key() | |
# Generate the time range to search for (12:00 - 23:59 today) | |
midnight_local = dt.combine(date.today(), time()) | |
start_time_utc = midnight_local.astimezone(timezone.utc) | |
end_time_utc = start_time_utc + timedelta(hours = 23, minutes=59) | |
time_string = start_time_utc.strftime('%m/%e/%Y %H:%Mto') + end_time_utc.strftime('%m/%e/%Y %H:%M') | |
params = { | |
'rowCount': '100', | |
'order': 'Asc', | |
'startRow': '1', | |
'isActiveMetadata': 'true', | |
'fields': 'forecast_datetime_ending_utc,forecast_load_mw', | |
'forecast_datetime_beginning_utc': time_string, | |
'forecast_area': zone, | |
} | |
resource = "{}?{}".format(RESOURCE_FORECAST, urllib.parse.urlencode(params)) | |
headers = self._get_headers() | |
try: | |
with async_timeout.timeout(60): | |
response = await self._websession.get(resource, headers=headers) | |
full_data = await response.json() | |
data = full_data["items"] | |
# Sort the data, first by highest forecast load value, and then by date (it's possible to have multiple hours with the same forecast load value and we want the earliest one) | |
sorted_data = sorted(data, key = lambda x: (x["forecast_load_mw"] * -1, dt.strptime(x['forecast_datetime_ending_utc'],'%Y-%m-%dT%H:%M:%S').replace(tzinfo=timezone.utc).astimezone(None))) | |
# Now that the data is sorted, the first item contains the highest forecast load of the day in MW and what hour it's supposed to happen | |
load = int(sorted_data[0]["forecast_load_mw"]) | |
# Get the hour that the forecast ends (in UTC), parse it, and convert it to local time | |
forecast_hour_ending = dt.strptime(sorted_data[0]['forecast_datetime_ending_utc'],'%Y-%m-%dT%H:%M:%S').replace(tzinfo=timezone.utc).astimezone(None) | |
return (load, forecast_hour_ending) | |
except (asyncio.TimeoutError, aiohttp.ClientError) as err: | |
_LOGGER.error("Could not get forecast data from PJM: %s", err) | |
# try to get a new subscription key | |
await self._get_subscription_key() | |
return (None, None) | |
except Exception as err: | |
_LOGGER.error("Could not get forecast data from PJM: %s", err) | |
return (None, None) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment