|
"""Platfor for sensor integration.""" |
|
import asyncio |
|
import collections |
|
import logging |
|
import json |
|
|
|
import aiohttp |
|
import voluptuous as vol |
|
|
|
from homeassistant.components.sensor import PLATFORM_SCHEMA |
|
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME, ENERGY_KILO_WATT_HOUR |
|
from homeassistant.core import callback |
|
import homeassistant.helpers.config_validation as cv |
|
from homeassistant.helpers.entity import Entity |
|
|
|
from .const import * |
|
|
|
_LOGGER = logging.getLogger(__name__) |
|
|
|
# Validation of the user's configuration |
|
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( |
|
{ |
|
vol.Optional(CONF_USERNAME): cv.string, |
|
vol.Optional(CONF_PASSWORD): cv.string, |
|
} |
|
) |
|
|
|
DOMAIN = 'chargeamps' |
|
|
|
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None): |
|
"""Set up the sensor platform.""" |
|
# Assign configuration variables. |
|
# The configuration check takes care they are present. |
|
username = config[CONF_USERNAME] |
|
password = config.get(CONF_PASSWORD) |
|
|
|
# Setup connection with devices/cloud |
|
authenticate_info = await ChargeAmps.authenticate(username, password) |
|
|
|
# Verify that passed in configuration works |
|
if not authenticate_info: |
|
_LOGGER.error("Could not authenticate.") |
|
return |
|
|
|
# Add devices |
|
chargeamps = [ChargeAmps(authenticate_info, username, password)] |
|
hass.data[DOMAIN] = chargeamps |
|
async_add_entities(chargeamps, True) |
|
|
|
@callback |
|
def service_settings(call): |
|
"""Handle service call.""" |
|
settings = call.data.get("setting", "{}") |
|
entity_id = call.data.get("entity_id", "") |
|
_LOGGER.warning("Sending settings: {} to {}".format(settings, entity_id)) |
|
hass.loop.create_task(hass.data[DOMAIN][0].send(settings)) |
|
|
|
|
|
hass.services.async_register(DOMAIN, "change_settings", service_settings) |
|
|
|
|
|
class ChargeAmps(Entity): |
|
"""Representation of an ChargeAmps.""" |
|
|
|
def __init__(self, authenticate_info, username, password): |
|
"""Initialize an AwesomeLight.""" |
|
self._authenticate_info = authenticate_info |
|
self._username = username |
|
self._password = password |
|
self._state = {'owned': [{}], 'stats': {}} |
|
|
|
@staticmethod |
|
async def authenticate(username, password): |
|
"""Authenticate user.""" |
|
async with aiohttp.ClientSession() as session: |
|
async with session.post( |
|
AUTH_URL, headers=HEADERS, data=AUTH_STRING.format(username, password), |
|
) as resp: |
|
if resp.status == 200: |
|
return await resp.json() |
|
return False |
|
@property |
|
def unique_id(self): |
|
"""Return the unique id.""" |
|
return self._state["owned"][0].get("id", "ChargeAmps") |
|
|
|
@property |
|
def name(self): |
|
"""Return the display name of this light.""" |
|
return self._state["owned"][0].get("name", "ChargeAmps") |
|
|
|
@property |
|
def state(self): |
|
"""Return the state of the sensor.""" |
|
return self._state["stats"].get("totalConsumptionFromDayOne") |
|
|
|
@property |
|
def unit_of_measurement(self): |
|
"""Return the unit of measurement.""" |
|
return ENERGY_KILO_WATT_HOUR |
|
|
|
@property |
|
def icon(self): |
|
"""Return icon.""" |
|
return "mdi:flash" |
|
|
|
@property |
|
def device_state_attributes(self): |
|
"""Return status of device.""" |
|
args = {} |
|
args['id'] = self.entity_id |
|
_LOGGER.debug(self._state["owned"]) |
|
for i, connector in enumerate(self._state["owned"][0]["connectors"]): |
|
args[f'Connector {i}'] = connector["mode"] |
|
args[f'Connector {i} state'] = connector["evConnectorState"] |
|
args['Light'] = self._state["owned"][0]["downLight"] |
|
args['LED Brightness'] = self._state["owned"][0]["dimmer"] |
|
if "connectors" in self._state["owned"][0]: |
|
connector = self._state["owned"][0]["connectors"][0] |
|
args['userCurrent'] = connector['userCurrent'] |
|
args['totalConsumptionKwh'] = round(connector['totalConsumptionKwh'], 3) |
|
args['installationPhase'] = connector['installationPhase'] |
|
effect = 0 |
|
for i in (1, 2, 3): |
|
effect += connector["voltage{}".format(i)] * connector["current{}".format(i)] |
|
args[f"voltage_L{i}".title()] = connector[f"voltage{i}"] |
|
args[f"current_L{i}".title()] = connector[f"current{i}"] |
|
args['Energy'] = round(effect, 1) |
|
|
|
return args |
|
|
|
|
|
async def _async_update(self): |
|
"""Update states.""" |
|
async with aiohttp.ClientSession() as session: |
|
headers = { |
|
"Accept": "application/json, text/plain, */*", |
|
"Content-Type": "application/json;charset=UTF-8", |
|
"Authorization": "Token {}".format(self._authenticate_info["token"]), |
|
} |
|
uid = self._authenticate_info["user"]["id"] |
|
res = await asyncio.gather( |
|
session.get(OWNED_CHARGEPOINTS_URL.format(uid=uid), headers=headers,), |
|
session.get(STATS_URL.format(uid=uid), headers=headers,), |
|
) |
|
if (res[0].status == 200) and (res[1].status == 200): |
|
self._state["owned"] = await res[0].json() |
|
self._state["stats"] = await res[1].json() |
|
return 200 |
|
return res[0].status |
|
|
|
async def async_update(self, **kwargs): |
|
"""Update states.""" |
|
status = await self._async_update() |
|
if status == 401: |
|
self._authenticate_info = await self.authenticate(self._username, self._password) |
|
# print("Token: %s" % auth["token"]) |
|
await self._async_update() |
|
|
|
async def send(self, settings): |
|
"""Send command to chargeamps.""" |
|
def deep_update(source, overrides): |
|
""" |
|
Update a nested dictionary or similar mapping. |
|
Modify ``source`` in place. |
|
""" |
|
for key, value in overrides.items(): |
|
if isinstance(value, collections.Mapping) and value: |
|
returned = deep_update(source.get(key, {}), value) |
|
source[key] = returned |
|
elif isinstance(value, list): |
|
returned = [] |
|
for i, val in enumerate(source.get(key, [])): |
|
if i < len(value): |
|
returned.append(deep_update(val, value[i])) |
|
source[key] = returned |
|
else: |
|
source[key] = overrides[key] |
|
return source |
|
settings = json.loads(settings) |
|
owned = deep_update(self._state["owned"][0].copy(), settings) |
|
_LOGGER.debug("Sending settings: %s", owned) |
|
async with aiohttp.ClientSession() as session: |
|
headers = { |
|
"Accept": "application/json, text/plain, */*", |
|
"Content-Type": "application/json;charset=UTF-8", |
|
"Authorization": "Token {}".format(self._authenticate_info["token"]), |
|
} |
|
res = await session.put(CHARGEPOINTS_URL, json=owned, headers=headers,) |
|
if res.status == 200: |
|
_LOGGER.debug("OK") |
|
self.async_schedule_update_ha_state(force_refresh=True) |
|
else: |
|
await _LOGGER.warning(res.text()) |