Skip to content

Instantly share code, notes, and snippets.

@Actpohomoc
Forked from kennedyshead/elspot.py
Last active June 9, 2020 20:31
Show Gist options
  • Save Actpohomoc/c32096586d7133152b97cb7347443224 to your computer and use it in GitHub Desktop.
Save Actpohomoc/c32096586d7133152b97cb7347443224 to your computer and use it in GitHub Desktop.
{
"domain": "elspot",
"name": "elspot",
"documentation": "https://gist.github.com/kennedyshead/e341a855fd77974e2b87fd52d90eb823",
"dependencies": [],
"codeowners": ["@kennedyshead"],
"requirements": []
}
"""
Custom component to interface with vattenfall
"""
import logging
from datetime import datetime, timedelta
import pytz
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import CONF_REGION
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_REGION): cv.string,
})
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(minutes=60)
class Elspot:
# (start, stop, region)
URL = "https://www.vattenfall.se/api/price/spot/pricearea/%s/%s/%s"
@staticmethod
async def fetch(session, url):
import json
async with session.get(url) as response:
return json.loads(await response.text())
@staticmethod
async def get_data(url):
import aiohttp
async with aiohttp.ClientSession() as session:
html = await Elspot.fetch(session, url)
return html
async def async_setup_platform(
hass, config, async_add_entities, discovery_info=None):
"""Set up the asuswrt sensors."""
data = Elspot()
dt = datetime.now(pytz.timezone('Europe/Stockholm')).strftime("%Y-%m-%d")
url = Elspot.URL % (dt, dt, config[CONF_REGION])
data = await data.get_data(url)
entities = [
ElspotSensor(data, config[CONF_REGION]),
]
async_add_entities(entities)
class ElspotSensor(Entity):
def __init__(self, data, region):
self._name = 'Elspotpris %s' % region
self._region = region
try:
self._state = data[self.hour]['Value']
self._unit = data[self.hour]['Unit']
except IndexError:
self._state = None
self._unit = None
@property
def hour(self):
return int(
datetime.now(pytz.timezone('Europe/Stockholm')).strftime("%H"))
@property
def unit_of_measurement(self):
return self._unit
@property
def name(self):
return self._name
@property
def state(self):
return self._state
@Throttle(SCAN_INTERVAL)
async def async_update(self):
dt = datetime.now(
pytz.timezone('Europe/Stockholm')).strftime("%Y-%m-%d")
url = Elspot.URL % (dt, dt, self._region)
data = await Elspot.get_data(url)
try:
self._state = data[self.hour]['Value']
self._unit = data[self.hour]['Unit']
except IndexError:
self._state = None
@Actpohomoc
Copy link
Author

Nice working code!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment