Skip to content

Instantly share code, notes, and snippets.

@viiru-
Last active June 2, 2020 12:38
Show Gist options
  • Save viiru-/4da114fe94eb2cea2f4f7c708e8ef3b6 to your computer and use it in GitHub Desktop.
Save viiru-/4da114fe94eb2cea2f4f7c708e8ef3b6 to your computer and use it in GitHub Desktop.
Script for loading electricity consumption data from several local power companies into Influxdb
#!/usr/bin/python3
import sys
import asyncio
import json
import datetime
from urllib.parse import urljoin
from typing import NamedTuple
import aiohttp
import bs4
import pytz
import yaml
from aioinflux import InfluxDBClient, lineprotocol, TIMEDT, MEASUREMENT, TAG, FLOAT
FORM_URL = '/eServices/Online/IndexNoAuth'
LOGIN_URL = '/eServices/Online/Login'
USAGE_URL = '/Reporting/CustomerConsumption/GetHourlyConsumption'
LOCALTIME = pytz.timezone('Europe/Helsinki')
@lineprotocol
class PowerUsage(NamedTuple):
"""Definition of the Influxdb power measurement schema"""
timestamp: TIMEDT
total_usage: FLOAT
tariff: TAG
name: MEASUREMENT = 'power_usage'
@lineprotocol
class Temperature(NamedTuple):
"""Definition of the Influxdb temperature measurement schema"""
timestamp: TIMEDT
temperature: FLOAT
temperature_location: TAG
name: MEASUREMENT = 'temperature'
async def get_login_token(session, config):
"""Parse CSRF token from the login form"""
async with session.get(urljoin(config['base_url'], FORM_URL)) as login_form:
login_form.raise_for_status()
content = await login_form.text()
soup = bs4.BeautifulSoup(content, features='html.parser')
form_html = soup.find(id='loginform')
token = form_html.find('input', attrs={'name': '__RequestVerificationToken'})['value']
return token
async def login(session, config):
"""Login to the service, sets cookies as result"""
token = await get_login_token(session, config)
login_data = {'UserName': config['username'],
'Password': config['password'],
'__RequestVerificationToken': token}
async with session.post(urljoin(config['base_url'], LOGIN_URL), data=login_data) as resp:
resp.raise_for_status()
return resp
async def fetch_consumption(session, config):
"""Retrieves and parses JSON consumption data"""
params = {'customerCode': config['customerCode'],
'networkCode': config['networkCode'],
'meteringPointCode': config['meteringPointCode'],
'enableTemperature': 'true',
'enablePriceSeries': 'false',
'enableTemperatureCorrectedConsumption': 'true',
'mpSourceCompanyCode': '',
'activeTarificationId': ''}
async with session.post(urljoin(config['base_url'], USAGE_URL), data=params) as resp:
resp.raise_for_status()
json_data = await resp.text()
return json.loads(json_data)
def convert_timestamp(timestamp):
"""From milliseconds to seconds after epoch, and mark as local time.
Strange format, milliseconds from the epoch but not in UTC."""
dt = datetime.datetime.utcfromtimestamp(timestamp / 1000)
dt = LOCALTIME.localize(dt)
return dt
def process_consumptions(data):
"""Parses consumption data from the JSON response, and yields
PowerUsage objects for loading into Influxdb"""
for series in data:
tariff = series['Series']['Name']
for timestamp, usage in series['Series']['Data']:
measurement = PowerUsage(timestamp=convert_timestamp(timestamp),
tariff=tariff,
total_usage=usage)
yield measurement
def process_temperature(data):
"""Parses temperature data from the JSON response, and yields
Temperature objects for loading into Influxdb"""
temperature_location = data['Name']
for timestamp, temperature in data['Data']:
measurement = Temperature(timestamp=convert_timestamp(timestamp),
temperature=temperature,
temperature_location=temperature_location)
yield measurement
async def load_influx(data, config):
"""Loads parsed PowerUsage objects into Influxdb"""
async with InfluxDBClient(db=config['database'],
username=config['username'],
password=config['password']) as client:
consumptions = client.write(process_consumptions(data['Consumptions']))
temperature = client.write(process_temperature(data['Temperature']))
await asyncio.gather(consumptions, temperature)
async def main(config):
async with aiohttp.ClientSession() as session:
await login(session, config['service'])
data = await fetch_consumption(session, config['service'])
await load_influx(data, config['influxdb'])
series1_end = data['Consumptions'][0]['Series']['Data'][-1]
series1_name = data['Consumptions'][0]['Series']['Name']
series2_end = data['Consumptions'][1]['Series']['Data'][-1]
series2_name = data['Consumptions'][1]['Series']['Name']
print(series1_name, convert_timestamp(series1_end[0]), series1_end[1])
print(series2_name, convert_timestamp(series2_end[0]), series2_end[1])
print('Temperature', data['Temperature']['Name'],
convert_timestamp(data['Temperature']['Data'][-1][0]),
data['Temperature']['Data'][-1][1])
if __name__ == '__main__':
with open(sys.argv[1]) as config_file:
orig_config = yaml.safe_load(config_file)
loop = asyncio.get_event_loop()
loop.run_until_complete(main(orig_config))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment