Skip to content

Instantly share code, notes, and snippets.

@joe248
Last active May 16, 2023 12:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joe248/1d8e42dd6904d480770f67f2bb640a64 to your computer and use it in GitHub Desktop.
Save joe248/1d8e42dd6904d480770f67f2bb640a64 to your computer and use it in GitHub Desktop.
Home Assistant PJM Load Data Sensor
"""The PJM component."""
{
"domain": "pjm",
"name": "PJM Sensor",
"documentation": "",
"requirements": [],
"dependencies": [],
"codeowners": [],
"version": "1.0"
}
"""
Support for PJM data.
"""
import asyncio
from datetime import datetime as dt, date, time, timezone, timedelta
import logging
import aiohttp
import async_timeout
import voluptuous as vol
import urllib.parse
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_NAME, CONF_TYPE, CONF_ZONE, CONF_MONITORED_VARIABLES)
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.util import Throttle
_LOGGER = logging.getLogger(__name__)
RESOURCE_INSTANTANEOUS = 'https://api.pjm.com/api/v1/inst_load'
RESOURCE_FORECAST = 'https://api.pjm.com/api/v1/load_frcstd_7_day'
RESOURCE_SUBSCRIPTION_KEY = 'https://dataminer2.pjm.com/config/settings.json'
# Don't set these too low or you'll get IP banned
MIN_TIME_BETWEEN_UPDATES_INSTANTANEOUS = timedelta(seconds=300)
MIN_TIME_BETWEEN_UPDATES_FORECAST = timedelta(seconds=1800)
PJM_RTO_ZONE = "PJM RTO"
FORECAST_COMBINED_ZONE = 'RTO_COMBINED'
ICON_POWER = 'mdi:flash'
CONF_INSTANTANEOUS_ZONE_LOAD = 'instantaneous_zone_load'
CONF_INSTANTANEOUS_TOTAL_LOAD = 'instantaneous_total_load'
CONF_ZONE_LOAD_FORECAST = 'zone_load_forecast'
CONF_TOTAL_LOAD_FORECAST = 'total_load_forecast'
SENSOR_TYPES = {
CONF_INSTANTANEOUS_ZONE_LOAD: ["PJM Instantaneous Zone Load", 'MW'],
CONF_INSTANTANEOUS_TOTAL_LOAD: ["PJM Instantaneous Total Load", 'MW'],
CONF_ZONE_LOAD_FORECAST: ["PJM Zone Load Forecast", 'MW'],
CONF_TOTAL_LOAD_FORECAST: ["PJM Total Load Forecast", 'MW'],
}
SENSORS_SCHEMA = vol.Schema({
vol.Required(CONF_TYPE): vol.In(SENSOR_TYPES),
vol.Optional(CONF_ZONE): cv.string,
vol.Optional(CONF_NAME): cv.string,
})
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_MONITORED_VARIABLES): [SENSORS_SCHEMA],
})
async def async_setup_platform(hass, config, async_add_devices,
discovery_info=None):
"""Set up the PJM sensor."""
# Create one PJMData object shared between all sensors
pjm_data = PJMData(async_get_clientsession(hass))
dev = []
for variable in config[CONF_MONITORED_VARIABLES]:
sensor_type = variable[CONF_TYPE]
zone = variable.get(CONF_ZONE)
if sensor_type == CONF_INSTANTANEOUS_TOTAL_LOAD:
zone = PJM_RTO_ZONE
if sensor_type == CONF_TOTAL_LOAD_FORECAST:
zone = FORECAST_COMBINED_ZONE
dev.append(PJMSensor(
pjm_data, sensor_type,
zone, variable.get(CONF_NAME)))
async_add_devices(dev, True)
class PJMSensor(Entity):
"""Implementation of a PJM sensor."""
def __init__(self, pjm_data, sensor_type, zone, name):
"""Initialize the sensor."""
self._pjm_data = pjm_data
if name:
self._name = name
else:
self._name = SENSOR_TYPES[sensor_type][0]
if sensor_type == CONF_INSTANTANEOUS_ZONE_LOAD or sensor_type == CONF_ZONE_LOAD_FORECAST:
self._name += ' ' + zone
self._type = sensor_type
self._state = None
self._forecast_hour_ending = None
self._zone = zone
self._unit_of_measurement = SENSOR_TYPES[sensor_type][1]
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def icon(self):
"""Icon to use in the frontend, if any."""
return ICON_POWER
@property
def state(self):
"""Return the state of the sensor."""
return self._state
@property
def unit_of_measurement(self):
"""Return the unit of measurement of this entity, if any."""
return self._unit_of_measurement
@property
def extra_state_attributes(self):
attr = {}
if self._zone and self._type != CONF_TOTAL_LOAD_FORECAST:
attr["zone"] = self._zone
if self._type == CONF_TOTAL_LOAD_FORECAST or self._type == CONF_ZONE_LOAD_FORECAST:
attr["forecast_hour_ending"] = self._forecast_hour_ending
return attr
async def async_update(self):
"""Use the PJM data to set our state."""
try:
if self._type == CONF_INSTANTANEOUS_ZONE_LOAD or self._type == CONF_INSTANTANEOUS_TOTAL_LOAD:
await self.update_load()
else:
await self.update_forecast()
except (ValueError, KeyError):
_LOGGER.error("Could not update status for %s", self._name)
except AttributeError as err:
_LOGGER.error("Could not update status for PJM: %s", err)
except TypeError:
# I believe this can happen if one of the function calls is throttled. Just ignore.
pass
except Exception as err:
_LOGGER.error("Unknown error for PJM: %s", err)
@Throttle(MIN_TIME_BETWEEN_UPDATES_INSTANTANEOUS)
async def update_load(self):
load = await self._pjm_data.async_update_instantaneous(self._zone)
if load is not None:
self._state = load
@Throttle(MIN_TIME_BETWEEN_UPDATES_FORECAST)
async def update_forecast(self):
load, forecast_hour_ending = await self._pjm_data.async_update_forecast(self._zone)
if load is not None:
self._state = load
if forecast_hour_ending is not None:
self._forecast_hour_ending = forecast_hour_ending
class PJMData(object):
"""Get and parse data from PJM."""
def __init__(self, websession):
"""Initialize the data object."""
self._websession = websession
self._subscription_key = None
def _get_headers(self):
return {
'Ocp-Apim-Subscription-Key': self._subscription_key,
'Origin': 'https://dataminer2.pjm.com',
'Referer': 'https://dataminer2.pjm.com/',
'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Opera";v="86"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36 OPR/86.0.4363.59',
}
async def _get_subscription_key(self):
_LOGGER.info("Attempting to get subscription key")
try:
with async_timeout.timeout(60):
response = await self._websession.get(RESOURCE_SUBSCRIPTION_KEY)
data = await response.json()
self._subscription_key = data['subscriptionKey']
if self._subscription_key:
_LOGGER.info("Got subscription key")
except Exception as err:
_LOGGER.error("Could not get PJM subscription key: %s", err)
async def async_update_instantaneous(self, zone):
# If we don't have the subscription key we need to get it first
if not self._subscription_key:
await self._get_subscription_key()
# Search for data from the past 10 minutes
end_time_utc = dt.now().astimezone(timezone.utc)
start_time_utc = end_time_utc - timedelta(minutes=10)
time_string = start_time_utc.strftime('%m/%e/%Y %H:%Mto') + end_time_utc.strftime('%m/%e/%Y %H:%M')
params = {
'rowCount': '100',
'sort': 'datetime_beginning_utc',
'order': 'Desc',
'startRow': '1',
'isActiveMetadata': 'true',
'fields': 'area,instantaneous_load',
'datetime_beginning_utc': time_string,
}
resource = "{}?{}".format(RESOURCE_INSTANTANEOUS, urllib.parse.urlencode(params))
headers = self._get_headers()
try:
with async_timeout.timeout(60):
response = await self._websession.get(resource, headers=headers)
data = await response.json()
if not data:
_LOGGER.error("No load data returned for zone %s", zone)
# No data; return
return None
items = data["items"]
# The data should already be sorted by date, so find the first instance of the zone we're looking for
for item in items:
if item["area"] == zone:
return int(round(item["instantaneous_load"]))
_LOGGER.error("Couldn't find load data for zone %s", zone)
return None
except (asyncio.TimeoutError, aiohttp.ClientError) as err:
_LOGGER.error("Could not get load data from PJM: %s", err)
# Try to get a new subscription key
await self._get_subscription_key()
return None
except Exception as err:
_LOGGER.error("Could not get load data from PJM: %s", err)
return None
async def async_update_forecast(self, zone):
# If we don't have the subscription key we need to get it first
if not self._subscription_key:
await self._get_subscription_key()
# Generate the time range to search for (12:00 - 23:59 today)
midnight_local = dt.combine(date.today(), time())
start_time_utc = midnight_local.astimezone(timezone.utc)
end_time_utc = start_time_utc + timedelta(hours = 23, minutes=59)
time_string = start_time_utc.strftime('%m/%e/%Y %H:%Mto') + end_time_utc.strftime('%m/%e/%Y %H:%M')
params = {
'rowCount': '100',
'order': 'Asc',
'startRow': '1',
'isActiveMetadata': 'true',
'fields': 'forecast_datetime_ending_utc,forecast_load_mw',
'forecast_datetime_beginning_utc': time_string,
'forecast_area': zone,
}
resource = "{}?{}".format(RESOURCE_FORECAST, urllib.parse.urlencode(params))
headers = self._get_headers()
try:
with async_timeout.timeout(60):
response = await self._websession.get(resource, headers=headers)
full_data = await response.json()
data = full_data["items"]
# Sort the data, first by highest forecast load value, and then by date (it's possible to have multiple hours with the same forecast load value and we want the earliest one)
sorted_data = sorted(data, key = lambda x: (x["forecast_load_mw"] * -1, dt.strptime(x['forecast_datetime_ending_utc'],'%Y-%m-%dT%H:%M:%S').replace(tzinfo=timezone.utc).astimezone(None)))
# Now that the data is sorted, the first item contains the highest forecast load of the day in MW and what hour it's supposed to happen
load = int(sorted_data[0]["forecast_load_mw"])
# Get the hour that the forecast ends (in UTC), parse it, and convert it to local time
forecast_hour_ending = dt.strptime(sorted_data[0]['forecast_datetime_ending_utc'],'%Y-%m-%dT%H:%M:%S').replace(tzinfo=timezone.utc).astimezone(None)
return (load, forecast_hour_ending)
except (asyncio.TimeoutError, aiohttp.ClientError) as err:
_LOGGER.error("Could not get forecast data from PJM: %s", err)
# try to get a new subscription key
await self._get_subscription_key()
return (None, None)
except Exception as err:
_LOGGER.error("Could not get forecast data from PJM: %s", err)
return (None, None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment